RxJava: Parallelisation
- 3 minsLet’s define parallelism as processing multiple emissions at a time for a given Observable
. Although, the Observable
contract dictates that emissions must be pushed serially down to an Observable
, RxJava gives enough operators and tools to be clever.
Let’s take the following example:
fun main() {
Observable.range(1, 10)
.map { i -> intenseCalculation(i) }
.subscribe { System.out.println("Received $it ${LocalTime.now()}") }
}
Received 1 20:40:34.094
Received 2 20:40:36.109
Received 3 20:40:39.119
Received 4 20:40:42.128
Received 5 20:40:45.132
Received 6 20:40:45.132
Received 7 20:40:46.138
Received 8 20:40:49.140
Received 9 20:40:52.148
Received 10 20:40:54.149
The previous example took around 20 seconds to finish !
Operator: flatmap()
Let’s parallelise the previous example using flatmap
:
fun main() {
Observable.range(1, 10)
.flatMap {
Observable.just(it)
.subscribeOn(Schedulers.computation())
.map { i -> intenseCalculation(i) }
}
.subscribe { System.out.println("[${Thread.currentThread().name}] Received $it ${LocalTime.now()}") }
TimeUnit.SECONDS.sleep(5)
}
[RxComputationThreadPool-1] Received 1 20:55:20.026
[RxComputationThreadPool-7] Received 7 20:55:20.982
[RxComputationThreadPool-10] Received 5 20:55:20.985
[RxComputationThreadPool-10] Received 10 20:55:20.985
[RxComputationThreadPool-3] Received 3 20:55:21.982
[RxComputationThreadPool-3] Received 4 20:55:21.983
[RxComputationThreadPool-3] Received 9 20:55:21.983
[RxComputationThreadPool-2] Received 2 20:55:22.982
[RxComputationThreadPool-2] Received 6 20:55:22.983
[RxComputationThreadPool-2] Received 8 20:55:22.983
This time, it took only 3 seconds to complete !
An Observable
is created from each emission, emit it on a computation thread using subscribeOn()
, perform the intenseCalculation()
, and finally, flatMap()
will merge all of the threads safely back into a serialised stream.
Note: If a thread is already pushing an emission out of flatMap()
, any threads also waiting to push emissions will simply leave their emissions for that occupying thread to take ownership of.
Operator: groupBy()
Another way to parallelise the same example is by using groupBy()
and GroupedObservables
. This can be useful to restrict the number of thread to parallelise on (to the number of processor cores for example):
fun main() {
val coreCount = Runtime.getRuntime().availableProcessors()
val assigner = AtomicInteger(0)
Observable.range(1, 10)
.groupBy { assigner.incrementAndGet() % coreCount }
.flatMap { groupedObservable ->
groupedObservable
.observeOn(Schedulers.io())
.map { i -> intenseCalculation(i) }
}
.subscribe { System.out.println("[${Thread.currentThread().name}] Received $it ${LocalTime.now()}") }
TimeUnit.SECONDS.sleep(5)
}
[RxCachedThreadScheduler-5] Received 5 21:29:20.407
[RxCachedThreadScheduler-5] Received 3 21:29:20.415
[RxCachedThreadScheduler-5] Received 4 21:29:20.415
[RxCachedThreadScheduler-5] Received 7 21:29:20.415
[RxCachedThreadScheduler-5] Received 9 21:29:20.415
[RxCachedThreadScheduler-8] Received 8 21:29:21.344
[RxCachedThreadScheduler-8] Received 10 21:29:21.344
[RxCachedThreadScheduler-6] Received 6 21:29:22.343
[RxCachedThreadScheduler-6] Received 1 21:29:22.343
[RxCachedThreadScheduler-2] Received 2 21:29:23.340
Again, it took only 3 seconds to complete!
Note: GroupedObservables
are not necessarily impacted by subscribeOn()
, that’s why observeOn()
has been used here to parallelise instead.