Reactive Programming with RxJava
- 3 minsInterested in Reactive Extensions and RxJava, I enjoyed reading the excellent book: Learning RxJava by Thomas Nield, and the following are my notes.
Why RxJava?
- Concurrency, event handling, obsolete data states, and exception recovery.
- Maintainable, reusable, and evolvable.
- Allows applications to be tactical and evolvable while maintaining stability in production.
Quickstart
In ReactiveX, the core type is the Observable
which essentially pushes things. A given Observable<T>
pushes things of type T
through a series of operators until it arrives at an Observer
that consumes the items. The following is an example of an Observable<String>
that pushes three String
objects:
fun main() {
val observable = Observable.just("Hello", "world", "!")
}
Running this main
method isn’t doing anything other than declare a Observable<String>
. To make this Observable
actually emit these three strings, anObserver
need to subscribe to it and receive the items:
fun main() {
val observable = Observable.just("Hello", "world", "!")
observable.subscribe {
print("$it ")
}
}
This time, the output is the following:
Hello, world!
What happened here is that Observable<String>
pushed each String
object once at a time to the Observer
lambda.
It’s possible to use several operators between Observable
and Observer
to transform each pushed item or manipulate them, the following is an example of map()
:
fun main() {
val observable = Observable.just("Hello", "world", "!")
observable.map { it.uppercase() }.subscribe { print("$it ") }
}
The output should be:
HELLO WORLD!
RxJava vs Java 8 streams
How Observable
is any different from Java 8 Streams or Kotlin sequences? The key difference is that Observable
pushes the items while Streams and sequences pull the items.
Advanced RxJava
The following are more detailed notes for a deeper understanding of RxJava:
- Observable & Observer
- Hot vs Cold Observable
- Observable Factories
- Disposing
- Supressing, Transforming, Reducing, Collection, Recovery and Action Operators.
- Combining Observables
- Multicasting, Replaying and Caching and Subjects
- Concurrency and Parallelisation
- Buffering, Windowing, Throttling and Switching
- Backpressure, Flowable and Subscriber
- Transformers and Custom Operators
Sources
Note: code examples in this article are written in Kotlin to showcase the interoperability between Java and Kotlin, however, for Kotlin projects, it is most likely better to use RxKotlin.