Reactive Extensions: RxJava

Reactive Extensions (ReactiveX)

I always has been interested by Reactive Extensions (ReactiveX) and RxJava (and RxKotlin).
Recently, I enjoyed reading the excellent book: Learning RxJava by Thomas Nield, and the following are my notes.

Why RxJava ?

Quick Start

In ReactiveX, the core type is the Observable which essentially pushes things. A given Observable<T> pushes things of type T through a series of operators until it arrives at an Observer that consumes the items. The following is an example of an Observable<String> that will push three String objects:

fun main() {
    val observable = Observable.just("Hello", "world", "!")

However, running this main method is not going to do anything other than declare a Observable<String>. To make this Observable actually push (or emit) these three strings, we need an Observer to subscribe to it and receive the items:

fun main() {
    val observable = Observable.just("Hello", "world", "!")
    observable.subscribe {
        print("$it ")

This time, the output is the following:

Hello world ! 

What happened here is that our Observable<String> pushed each String object once at a time to the Observer (the lambda).

It’s possible to use several operators between Observable and Observer to transform each pushed item or manipulate them, the following is an example of map():

fun main() {
    val observable = Observable.just("Hello", "world", "!") { it.toUpperCase() }.subscribe { print("$it ") }

The output should be:


RxJava vs Java 8 Streams

How Observable is any different from Java 8 Streams or Kotlin sequences? The key difference is that Observable pushes the items while Streams and sequences pull the items.

Advanced RxJava

The following are more detailed notes for a deeper understanding of RxJava:



