Reactive Programming with RxJava
- 4 mins
Update Note: This RxJava series was written for RxJava 2.x (2019). RxJava 3.x introduced breaking changes and improvements. Core concepts remain the same, but some APIs have changed. Refer to the RxJava 3.x migration guide for differences.
Interested in Reactive Extensions and RxJava, I enjoyed reading the excellent book: Learning RxJava by Thomas Nield, and the following are my notes.
Why RxJava?
- Concurrency, event handling, obsolete data states, and exception recovery.
- Maintainable, reusable, and evolvable.
- Allows applications to be tactical and evolvable while maintaining stability in production.
Quickstart
In ReactiveX, the core type is the Observable which essentially pushes things. A given Observable<T> pushes things of type T through a series of operators until it arrives at an Observer that consumes the items. The following is an example of an Observable<String> that pushes three String objects:
fun main() {
val observable = Observable.just("Hello", "world", "!")
}
Running this main method isn’t doing anything other than declare a Observable<String>. To make this Observable actually emit these three strings, an Observer need to subscribe to it and receive the items:
fun main() {
val observable = Observable.just("Hello", "world", "!")
observable.subscribe {
print("$it ")
}
}
This time, the output is the following:
Hello world!
What happened here is that Observable<String> pushed each String object once at a time to the Observer lambda.
It’s possible to use several operators between Observable and Observer to transform each pushed item or manipulate them, the following is an example of map():
fun main() {
val observable = Observable.just("Hello", "world", "!")
observable.map { it.uppercase() }.subscribe { print("$it ") }
}
The output should be:
HELLO WORLD!
RxJava vs Java 8 streams
How Observable is any different from Java 8 Streams or Kotlin sequences? The key difference is that Observable pushes the items while Streams and sequences pull the items.
RxJava Series Guide
This is a comprehensive guide to RxJava organized by topic. Follow the links below for in-depth coverage:
Fundamentals
- Observable & Observer - Core concepts and Observable factories
- Hot vs Cold Observable - Understanding observable behavior patterns
- Observable Factories - Additional factory methods (range, interval, timer, etc.)
- Disposing - Resource management and stopping emissions
Operators
- Filtering & Control: Suppressing - filter, take, skip, distinct
- Transformation: Transforming - map, flatMap, concatMap, switchMap
- Aggregation: Reducing - count, reduce, all, any
- Collection: Collection - toList, toMap, collect
- Error Handling: Recovery - onErrorReturn, onErrorResumeNext
- Side Effects: Action - doOnNext, doOnComplete, doOnError
Advanced Topics
- Combining Observables - merge, concat, zip, combineLatest
- Multicasting - ConnectableObservable and sharing streams
- Replaying and Caching - replay() and cache() operators
- Subjects - PublishSubject, BehaviorSubject, and more
- Concurrency - subscribeOn and observeOn with Schedulers
- Parallelisation - Parallel execution strategies
Flow Control
- Buffering - Batch emissions into collections
- Windowing - Batch emissions into separate Observables
- Throttling - Control emission rate
- Switching - Cancel previous Observables
Backpressure
- Backpressure - Understanding and handling backpressure
- Flowable - Observable with backpressure support
- Subscriber - Consuming Flowables
Customization
- Transformers - Reusable operator chains
- Custom Operators - Building your own operators
Sources
Note: code examples in this article are written in Kotlin to showcase the interoperability between Java and Kotlin, however, for Kotlin projects, it is most likely better to use RxKotlin.