RxJava: Action Operators

- 5 mins

Action operators (doOn) are helpful operators that can assist in debugging and getting visibility into an Observable chain.

doOnNext()

The doOnNext() operator allow to peek at each emission coming out of an operator and going into the next:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta")
        .doOnNext { s -> println("Processing: $s") }
        .map { s -> s.length}
        .subscribe { i -> println("Received: $i") }
}
Processing: Alpha
Received: 5
Processing: Beta
Received: 4
Processing: Gamma
Received: 5
Processing: Delta
Received: 5

It also possible to leverage doAfterNext() which performs the action after the emission is passed downstream rather than before.

doOnComplete()

The doOnComplete() operator fires off an action when onComplete() is called at the point in the Observable chain:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta")
        .doOnComplete { println("Done emitting") }
        .map { s -> s.length }
        .subscribe { i -> println("Received: $i") }
}
Received: 5
Received: 4
Received: 5
Received: 5
Done emitting

doOnError()

The doOnError() peeks at the error being emitted up the chain. This is helpful to put between operators to see which one causes an error:

fun main() {
    Observable.just(5, 2, 4, 0, 3, 2, 8)
        .doOnError { println("Source failed!") }
        .map { i -> 10 / i }
        .doOnError { println("Division failed!") }
        .subscribe(
            { i -> println("Received: $i") },
            { e -> println("Error: $e") }
        )
}
Received: 2
Received: 5
Received: 2
Division failed!
Error: java.lang.ArithmeticException: / by zero

doOnEach() and doOnTerminate()

doOnSubscribe()

At a specific point in an Observable chain, doOnSubscribe() fires a specific Disposable the moment a subscription occurs:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma")
        .map { it.length }
        .doOnSubscribe { println("Subscribed: $it") }
        .subscribe { println("Received: $it") }
}
Subscribed: io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable@53bd815b
Received: 5
Received: 4
Received: 5

doOnDispose()

At a specific point in an Observable chain, doOnDispose() will perform a specific action when disposal is executed:

fun main() {
    var disposable: Disposable? = null
    Observable.just("Alpha", "Beta", "Gamma")
        .doAfterNext { disposable?.dispose() }
        .doOnDispose { println("Disposing!") }
        .doOnSubscribe { disposable = it }
        .subscribe { println("Received: $it") }
}
Received: Alpha
Disposing!

doOnDispose() can fire multiple times for multiple disposal requests or not at all if it is not disposed.

doFinally

This operator will fire after either onComplete() , onError() or disposing: ``

fun main() {
    Observable.just("Alpha", "Beta", "Gamma")
        .doFinally { println("Done !") }
        .subscribe { println("Received: $it") }
}
Received: Alpha
Received: Beta
Received: Gamma
Done !

doOnSuccess()

The operator onSuccess() exists because of types like Single and Maybe which do have onSuccess instead of onNext:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma")
        .firstElement()
        .doOnSuccess { println("Success !")  }
        .subscribe { println("Received: $it") }
}
Success !
Received: Alpha
Mouaad Aallam

Mouaad Aallam

Software Engineer

rss facebook twitter github youtube mail spotify instagram linkedin google pinterest medium vimeo mastodon gitlab docker