RxJava: Hot vs Cold Observable

- 3 mins

Observables can be cold or hot, which defines how they behave when there are multiple Observers.

Cold Observables

Cold Observables will replay the emissions to each Observer, ensuring that all Observers get all the data.
Usually, most data-driven (finite datasets) Observables are cold (including Observable.just and Observable.fromIterable), example:

fun main() {
    val source: Observable<String> = Observable.just("Alpha", "Beta", "Charlie", "Delta", "Epsilon")

    // First observer
    source.map { it.length }.subscribe{ println("[Observer1] Received: $it") }

    // Second observer
    source.subscribe{ println("[Observer2] Received: $it") }
}
[Observer1] Received: 5
[Observer1] Received: 4
[Observer1] Received: 7
[Observer1] Received: 5
[Observer1] Received: 7
[Observer2] Received: Alpha
[Observer2] Received: Beta
[Observer2] Received: Charlie
[Observer2] Received: Delta
[Observer2] Received: Epsilon

Both Observers receive the same datasets by getting two separate streams each.

Hot Observables

Hot Observables often represent events rather than finite datasets. The events can carry data with them, but there is a time-sensitive component where late observers can miss previously emitted data. UI events or server requests, for example, can be represented as a hot Observable. The following is an example of a JavaFX UI with a Togglebutton and a Label, It creates an Observable which emits toggling actions, the subscribed Observer consumes this information and changes the Label accordingly:

class App : Application() {
    override fun start(primaryStage: Stage?) {
        // ...
        toggleButton.selectedProperty().toObservable()
            .map { if (it) "DOWN" else "UP" }
            .subscribe { label.text = it }
        // ...
    }
}

private fun <T> ObservableValue<T>. toObservable(): Observable<T> {
    return Observable.create { observableEmitter ->
        //emit initial state
        observableEmitter.onNext(value)
        //emit value changes uses a listener
        addListener { _, _, newValue -> observableEmitter.onNext(newValue) }
    }
}

If there was new Observers to this ToggleButton’s events after emissions have occurred, those new Observers will have missed these emissions.
While many hot Observables are indeed infinite, they do not have to be. They just have to share emissions to all Observers simultaneously and not replay missed emissions for tardy Observers.

ConnectableObservable

ConnectableObservable takes any Observable (even if it is cold) and makes it hot, so that all emissions are played to all Observers at once.
Calling publish() on any Observable returns a ConnectableObservable. Subscribing to the ConnectableObservable won’t start the emissions, connect() must be called to start firing them:

fun main() {
    val source: ConnectableObservable<String> = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon").publish()
    //Set up observer 1
    source.subscribe { println("Observer 1: $it") }
    //Set up observer 2
    source.map { it.length }.subscribe { println("Observer 2: $it") }
    //Fire!
    source.connect()
}
Observer 1: Alpha
Observer 2: 5
Observer 1: Beta
Observer 2: 4
Observer 1: Gamma
Observer 2: 5
Observer 1: Delta
Observer 2: 5
Observer 1: Epsilon
Observer 2: 7

Using ConnectableObservable allows the set up all Observers beforehand and force each emission to go to all Observers simultaneously (multicasting).

Mouaad Aallam

Mouaad Aallam

Computer Software Engineer

rss facebook twitter github youtube mail spotify instagram linkedin google pinterest medium vimeo gitlab docker