Reactive Programming with RxJava
- 3 mins
Interested in Reactive Extensions and RxJava, I enjoyed reading the excellent book: Learning RxJava by Thomas Nield, and the following are my notes.
Why RxJava?
- Concurrency, event handling, obsolete data states, and exception recovery.
- Maintainable, reusable, and evolvable.
- Allows applications to be tactical and evolvable while maintaining stability in production.
Quickstart
In ReactiveX, the core type is the Observable which essentially pushes things. A given Observable<T> pushes things of type T through a series of operators until it arrives at an Observer that consumes the items. The following is an example of an Observable<String> that pushes three String objects:
fun main() {
val observable = Observable.just("Hello", "world", "!")
}
Running this main method isn’t doing anything other than declare a Observable<String>. To make this Observable actually emit these three strings, anObserver need to subscribe to it and receive the items:
fun main() {
val observable = Observable.just("Hello", "world", "!")
observable.subscribe {
print("$it ")
}
}
This time, the output is the following:
Hello, world!
What happened here is that Observable<String> pushed each String object once at a time to the Observer lambda.
It’s possible to use several operators between Observable and Observer to transform each pushed item or manipulate them, the following is an example of map():
fun main() {
val observable = Observable.just("Hello", "world", "!")
observable.map { it.uppercase() }.subscribe { print("$it ") }
}
The output should be:
HELLO WORLD!
RxJava vs Java 8 streams
How Observable is any different from Java 8 Streams or Kotlin sequences? The key difference is that Observable pushes the items while Streams and sequences pull the items.
Advanced RxJava
The following are more detailed notes for a deeper understanding of RxJava:
- Observable & Observer
- Hot vs Cold Observable
- Observable Factories
- Disposing
- Supressing, Transforming, Reducing, Collection, Recovery and Action Operators.
- Combining Observables
- Multicasting, Replaying and Caching and Subjects
- Concurrency and Parallelisation
- Buffering, Windowing, Throttling and Switching
- Backpressure, Flowable and Subscriber
- Transformers and Custom Operators
Sources
Note: code examples in this article are written in Kotlin to showcase the interoperability between Java and Kotlin, however, for Kotlin projects, it is most likely better to use RxKotlin.