RxJava: Subjects
- 3 minsErik Meijer, the creator of ReactiveX, describes Subjects as the “mutable variables of reactive programming“ . Subjects are both an Observer
and an Observable
acting as a proxy mulitcasting device (like an event bus) .
PublishSubject
fun main() {
val subject = PublishSubject.create<String>()
subject.map(String::length).subscribe { println("Received: $it") }
subject.onNext("Alpha")
subject.onNext("Beta")
subject.onNext("Gamma")
subject.onComplete()
}
Received: 5
Received: 4
Received: 5
When to use
Subjects are good to eagerly subscribe to an unknown number of multiple source Observable
s and consolidate their emissions as a single Observable
:
fun main() {
val source1 = Observable.interval(1, TimeUnit.SECONDS).map { "${it + 1} seconds" }
val source2 = Observable.interval(300, TimeUnit.MILLISECONDS).map { "${(it + 1) * 300} milliseconds" }
val subject = PublishSubject.create<String>()
subject.subscribe { println(it) }
source1.subscribe(subject)
source2.subscribe(subject)
TimeUnit.SECONDS.sleep(3)
}
300 milliseconds
600 milliseconds
900 milliseconds
1 seconds
1200 milliseconds
1500 milliseconds
1800 milliseconds
2 seconds
2100 milliseconds
2400 milliseconds
2700 milliseconds
3 seconds
3000 milliseconds
When goes wrong
Subjects are hot, executing the onNext()
calls before the Observers are subscribed would result in these emissions being missed:
fun main() {
val subject = PublishSubject.create<String>()
subject.onNext("Alpha")
subject.onNext("Beta")
subject.onNext("Gamma")
subject.onComplete()
subject.map(String::length).subscribe { println("Received: $it") }
}
(No output)
Serialising
In Subjects, the onSubscribe()
, onNext()
, onError()
, and onComplete()
calls are not thread-safe. toSerialized()
wraps the Subject
to make it thread-safe:
PublishSubject.create<String>().toSerialized()
Other Subjects
BehaviorSubject
: replays the last emitted item to each newObserver
downstream.ReplaySubject
: similar toPublishSubject
followed by acache()
operator.AsyncSubject
: only pushes the last value it receives, followed by anonComplete()
event.UnicastSubject
: buffers all the emissions it receives until anObserver
subscribes to it, and then it releases all these emissions to theObserver
and clear its cache.