RxJava: Concurrency

- 13 mins

Concurrency is handling multiple tasks being in progress at the same time, but not necessary simultaneously.

The following are two tasks which will run sequentially:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .map { intenseCalculation(it) }
        .subscribe { println(it) }

    Observable.range(1, 6)
        .map { s -> intenseCalculation(s) }
        .subscribe { println(it) }
}
Alpha
Beta
Gamma
Delta
Epsilon
1
2
3
4
5
6

Using the operator subscribeOn() makes the same tasks run concurrently:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .subscribeOn(Schedulers.computation())
        .map { intenseCalculation(it) }
        .subscribe { println(it) }

    Observable.range(1, 6)
        .subscribeOn(Schedulers.computation())
        .map { s -> intenseCalculation(s) }
        .subscribe { println(it) }

    TimeUnit.SECONDS.sleep(20)
}
Alpha
1
2
Beta
3
Gamma
4
Delta
5
Epsilon
6

RxJava operators work safely with Observables on different threads. For example, operators and factories that combine multiple Observables (merge(), zip()…), will safely combine emissions pushed by different threads:

fun main() {
    val source1 = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .subscribeOn(Schedulers.computation())
        .map { intenseCalculation(it) }

    val source2 = Observable.range(1, 6)
        .subscribeOn(Schedulers.computation())
        .map { s -> intenseCalculation(s) }

    Observable.zip(source1, source2, BiFunction { s1: String, s2: Int -> s1 to s2 })
        .subscribe { println(it) }

    TimeUnit.SECONDS.sleep(20)
}
(Alpha, 1)
(Beta, 2)
(Gamma, 3)
(Delta, 4)
(Epsilon, 5)

Instead of using sleep, it’s possible to use blockingSubscribe() (mostly for testing purposes).

Schedulers

Schedulers are mainly thread pools with different policies, threads may be persisted and maintained so they can be reused. A queue of tasks is then executed by that thread pool:

Operator: subscribeOn()

The subscribeOn() operator can be put anywhere in the Observable chain to suggest to the upstream source which Scheduler to use.
If that source is not already tied to a particular Scheduler, it will use the specified Scheduler. It will then push emissions _all the way_to the final Observer using that thread (unless observeOn() calls are added).

fun main() {
    val source1 = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .subscribeOn(Schedulers.computation())
        .map { intenseCalculation(it) }
        .doOnNext { println("${Thread.currentThread().name} $it") }

    val source2 = Observable.range(1, 6)
        .map { s -> intenseCalculation(s) }
        .subscribeOn(Schedulers.computation())
        .doOnNext { println("${Thread.currentThread().name} $it") }

    val source3 = Observable.just('α', 'β', 'γ', 'δ', 'ε')
        .map { intenseCalculation(it) }
        .doOnNext { println("${Thread.currentThread().name} $it") }
        .subscribeOn(Schedulers.computation())

    Observable.zip(source1, source2, source3, Function3 { s1: String, s2: Int, s3: Char -> Triple(s1, s2, s3) })
        .subscribe { println(it) }

    TimeUnit.SECONDS.sleep(20)
}
RxComputationThreadPool-2 1
RxComputationThreadPool-1 Alpha
RxComputationThreadPool-3 α
(Alpha, 1, α)
RxComputationThreadPool-3 β
RxComputationThreadPool-3 γ
RxComputationThreadPool-1 Beta
RxComputationThreadPool-2 2
(Beta, 2, β)
RxComputationThreadPool-1 Gamma
RxComputationThreadPool-2 3
RxComputationThreadPool-3 δ
(Gamma, 3, γ)
RxComputationThreadPool-3 ε
RxComputationThreadPool-1 Delta
RxComputationThreadPool-2 4
(Delta, 4, δ)
RxComputationThreadPool-1 Epsilon
RxComputationThreadPool-2 5
(Epsilon, 5, ε)

Having multiple Observers to the same Observable with subscribeOn() will result in each one getting its own thread:

fun main() {
    val source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .subscribeOn(Schedulers.computation())
        .map { intenseCalculation(it) }

    source.subscribe { println("${Thread.currentThread().name} $it") }
    source.subscribe { println("${Thread.currentThread().name} $it") }

    TimeUnit.SECONDS.sleep(15)
}
RxComputationThreadPool-1 Alpha
RxComputationThreadPool-2 Alpha
RxComputationThreadPool-2 Beta
RxComputationThreadPool-2 Gamma
RxComputationThreadPool-2 Delta
RxComputationThreadPool-1 Beta
RxComputationThreadPool-2 Epsilon
RxComputationThreadPool-1 Gamma
RxComputationThreadPool-1 Delta
RxComputationThreadPool-1 Epsilon

It’s possible however to use a multicast operator to only have one thread to serve multiple Observers:

fun main() {
    val source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .subscribeOn(Schedulers.computation())
        .map { intenseCalculation(it) }
        .publish()
        .autoConnect(2)

    source.subscribe { println("${Thread.currentThread().name} $it") }
    source.subscribe { println("${Thread.currentThread().name} $it") }

    TimeUnit.SECONDS.sleep(15)
}
RxComputationThreadPool-1 Alpha
RxComputationThreadPool-1 Alpha
RxComputationThreadPool-1 Beta
RxComputationThreadPool-1 Beta
RxComputationThreadPool-1 Gamma
RxComputationThreadPool-1 Gamma
RxComputationThreadPool-1 Delta
RxComputationThreadPool-1 Delta
RxComputationThreadPool-1 Epsilon
RxComputationThreadPool-1 Epsilon

Notes

Operator: observeOn()

The observeOn() is an operator that intercepts emissions at the point where is has been called in the Observable chain and switch them to a different Scheduler going forward:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .subscribeOn(Schedulers.computation())
        .doOnNext { println("[doOnNext][${Thread.currentThread().name}] $it") }
        .map { intenseCalculation(it) } // Runs on `Computation`
        .observeOn(Schedulers.io())
        .subscribe { println("[subscribe][${Thread.currentThread().name}] $it") } // Runs on `IO`

    TimeUnit.SECONDS.sleep(10)
}
[doOnNext][RxComputationThreadPool-1] Alpha
[doOnNext][RxComputationThreadPool-1] Beta
[subscribe][RxCachedThreadScheduler-1] Alpha
[doOnNext][RxComputationThreadPool-1] Gamma
[subscribe][RxCachedThreadScheduler-1] Beta
[doOnNext][RxComputationThreadPool-1] Delta
[subscribe][RxCachedThreadScheduler-1] Gamma
[doOnNext][RxComputationThreadPool-1] Epsilon
[subscribe][RxCachedThreadScheduler-1] Delta
[subscribe][RxCachedThreadScheduler-1] Epsilon

This capacity of switching Scheduler can be very useful for UI thread events (Like for Android, JavaFx…): Using observeOn() to move UI events to a different Scheduler to do the work, and when the result is ready, move it back to the UI thread with another observeOn().

Notes

Operator: unsubscribeOn()

When disposing an Observable, sometimes, such operation can be an expensive, for instance, if the Observable is emitting the results of a database query, it can be expensive to stop and dispose that Observable because it needs to shut down the JDBC resources it is using.

Let’s take the following example:

fun main() {
    val disposable = Observable.interval(1, TimeUnit.SECONDS)
        .doOnDispose { System.out.println("Disposing on thread ${Thread.currentThread().name}") }
        .subscribe { System.out.println("Received $it") }

    TimeUnit.SECONDS.sleep(3)
    disposable.dispose()
    TimeUnit.SECONDS.sleep(3)
}
Received 0
Received 1
Received 2
Disposing on thread main

In the previous example, the disposing operation is happening in the main thread. This behaviour might not be desired (In case of Android for example), but no worries, unsubscribeOn() operator is here to the rescue:

fun main() {
    val disposable = Observable.interval(1, TimeUnit.SECONDS)
        .doOnDispose { System.out.println("Disposing on thread ${Thread.currentThread().name}") }
        .unsubscribeOn(Schedulers.io())
        .subscribe { System.out.println("Received $it") }

    TimeUnit.SECONDS.sleep(3)
    disposable.dispose()
    TimeUnit.SECONDS.sleep(3)
}
Received 0
Received 1
Received 2
Disposing on thread RxCachedThreadScheduler-1

Now the disposing is happening in the IO thread.

Note: unsubscribeOn() should not be used for lightweight operations as it adds unnecessary overhead.
It’s possible to use multiple unsubscribeOn() calls to target specific parts of the Observable chain to be disposed of with different Schedulers.

Mouaad Aallam

Mouaad Aallam

Computer Software Engineer

rss facebook twitter github youtube mail spotify instagram linkedin google pinterest medium vimeo gitlab docker