RxJava: Action Operators
- 5 minsAction operators (doOn) are helpful operators that can assist in debugging and getting visibility into an Observable
chain.
doOnNext()
The doOnNext()
operator allow to peek at each emission coming out of an operator and going into the next:
fun main() {
Observable.just("Alpha", "Beta", "Gamma", "Delta")
.doOnNext { s -> println("Processing: $s") }
.map { s -> s.length}
.subscribe { i -> println("Received: $i") }
}
Processing: Alpha
Received: 5
Processing: Beta
Received: 4
Processing: Gamma
Received: 5
Processing: Delta
Received: 5
It also possible to leverage doAfterNext()
which performs the action after the emission is passed downstream rather than before.
doOnComplete()
The doOnComplete()
operator fires off an action when onComplete()
is called at the point in the Observable
chain:
fun main() {
Observable.just("Alpha", "Beta", "Gamma", "Delta")
.doOnComplete { println("Done emitting") }
.map { s -> s.length }
.subscribe { i -> println("Received: $i") }
}
Received: 5
Received: 4
Received: 5
Received: 5
Done emitting
doOnError()
The doOnError()
peeks at the error being emitted up the chain. This is helpful to put between operators to see which one causes an error:
fun main() {
Observable.just(5, 2, 4, 0, 3, 2, 8)
.doOnError { println("Source failed!") }
.map { i -> 10 / i }
.doOnError { println("Division failed!") }
.subscribe(
{ i -> println("Received: $i") },
{ e -> println("Error: $e") }
)
}
Received: 2
Received: 5
Received: 2
Division failed!
Error: java.lang.ArithmeticException: / by zero
doOnEach() and doOnTerminate()
doOnEach()
: Useful to to specify an observer in the middle of the chain to peek at all emissions.doOnTerminate()
: fires for anonComplete()
oronError()
event.
doOnSubscribe()
At a specific point in an Observable
chain, doOnSubscribe()
fires a specific Disposable
the moment a subscription occurs:
fun main() {
Observable.just("Alpha", "Beta", "Gamma")
.map { it.length }
.doOnSubscribe { println("Subscribed: $it") }
.subscribe { println("Received: $it") }
}
Subscribed: io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable@53bd815b
Received: 5
Received: 4
Received: 5
doOnDispose()
At a specific point in an Observable
chain, doOnDispose()
will perform a specific action when disposal is executed:
fun main() {
var disposable: Disposable? = null
Observable.just("Alpha", "Beta", "Gamma")
.doAfterNext { disposable?.dispose() }
.doOnDispose { println("Disposing!") }
.doOnSubscribe { disposable = it }
.subscribe { println("Received: $it") }
}
Received: Alpha
Disposing!
doOnDispose()
can fire multiple times for multiple disposal requests or not at all if it is not disposed.
doFinally
This operator will fire after either onComplete() , onError() or disposing: ``
fun main() {
Observable.just("Alpha", "Beta", "Gamma")
.doFinally { println("Done !") }
.subscribe { println("Received: $it") }
}
Received: Alpha
Received: Beta
Received: Gamma
Done !
doOnSuccess()
The operator onSuccess()
exists because of types like Single
and Maybe
which do have onSuccess
instead of onNext
:
fun main() {
Observable.just("Alpha", "Beta", "Gamma")
.firstElement()
.doOnSuccess { println("Success !") }
.subscribe { println("Received: $it") }
}
Success !
Received: Alpha