RxJava: Disposing

- 5 mins

The Disposable is a link between an Observable and an active Observer, calling its dispose() method stops the emissions and dispose of all resources used for that Observer:

fun main() {
    val seconds = Observable.interval(1, TimeUnit.SECONDS)
    val disposable = seconds.subscribe { println("Received: $it") }
    TimeUnit.SECONDS.sleep(5)
    // Dispose and stop emissions.
    disposable.dispose()
    // Sleep 5 secs: no new emissions.
    TimeUnit.SECONDS.sleep(5)
}
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4

Disposable within an Observer

An Observer receives it’s own Disposable when subscribing as an argument of onSubscribe(d: Disposable). The following is an example of an Observer self-disposing after consuming an emission:

fun main() {
    val seconds = Observable.interval(1, TimeUnit.SECONDS)
    seconds.subscribe(MyObserver())
    TimeUnit.SECONDS.sleep(5)

}

class MyObserver<T> : Observer<T> {
    private lateinit var disposable: Disposable

    override fun onSubscribe(disposable: Disposable) {
        this.disposable = disposable
    }

    override fun onNext(t: T) {
        println("Received: $t")
        disposable.dispose() // Self dispose
    }

    override fun onError(e: Throwable) {
        e.printStackTrace()
    }

    override fun onComplete() {
        println("Done !")
    }
}

Received: 0

By default, calling subscribe() with an Observer instance don’t return a Disposable, but it possible to get it by extending ResourceObserver and subscribing using subscribeWith():

fun main() {
    val seconds = Observable.interval(1, TimeUnit.SECONDS)
    val disposable: Disposable = seconds.subscribeWith(MyResourceObserver())
    TimeUnit.SECONDS.sleep(5)
    // Dispose and stop emissions.
    disposable.dispose()
    // Sleep 5 secs: no new emissions.
    TimeUnit.SECONDS.sleep(5)
}

class MyResourceObserver<T> : ResourceObserver<T>() {
    override fun onNext(t: T) {
        println("Received: $t")
    }

    override fun onError(e: Throwable) {
        e.printStackTrace()
    }

    override fun onComplete() {
        println("Done !")
    }
}
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4

CompositeDisposable

To manage and dispose of several subscriptions, CompositeDisposable is useful:

fun main() {
    val disposables = CompositeDisposable()
    val seconds = Observable.interval(1, TimeUnit.SECONDS)
    // Subscribe and capture disposables
    val disposable1 = seconds.subscribe { println("Observer 1: $it") }
    val disposable2 = seconds.subscribe { println("Observer 2: $it") }
    // Put disposables into CompositeDisposable
    disposables.addAll(disposable1, disposable2)
    // Sleep 5 secs
    TimeUnit.SECONDS.sleep(3)
    //dispose all disposables
    disposables.dispose()
    //sleep 5 seconds: no emissions.
    TimeUnit.SECONDS.sleep(3)
}
Observer 1: 0
Observer 2: 0
Observer 1: 1
Observer 2: 1
Observer 1: 2
Observer 2: 2

Disposal with Observable.create()

In case of an Observable.create() returning a long-running or infinite Observable, ideally, the emitter’s isDisposed() should checked regularly to see whether to keep sending emissions:

fun main() {
    val source: Observable<Int> = Observable.create { observableEmitter ->
        try {

            for (i in 0..1_000_000) {
                if (observableEmitter.isDisposed)
                    return@create
                observableEmitter.onNext(i)
            }
            observableEmitter.onComplete()

        } catch (e: Exception) {
            observableEmitter.onError(e)
        }
    }
}

In case Observable.create() is wrapped around some resource, the disposal of that resource must be handled to prevent leaks. ObservableEmitter has the setCancellable() and setDisposable() methods for that:

fun <T> valuesOf(fxObservable: ObservableValue<T>): Observable<T> {
    return Observable.create { observableEmitter ->
        // Emit initial state
        observableEmitter.onNext(fxObservable.value)
        // Emit value changes uses a listener
        val listener = ChangeListener<T> { _, _, current -> observableEmitter.onNext(current) }
        // Add listener to ObservableValue
        fxObservable.addListener(listener)
        // Handle disposing by specifying cancellable
        observableEmitter.setCancellable { fxObservable.removeListener(listener) }
    }
}
Mouaad Aallam

Mouaad Aallam

Software Engineer

rss facebook twitter github youtube mail spotify instagram linkedin google pinterest medium vimeo mastodon gitlab docker