RxJava: Custom Operators

- 5 mins

Creating custom operators is the last resort when existing operators and/or transformers can’t do (or can’t easily do) a specific task.

ObservableOperator

Lets create the custom doOnEmpty() operator: it will execute an Action when onComplete() is called and no emissions have occurred.

To do so, lets implement ObservableOperator<Downstream,Upstream> and its apply() method. This method accepts an Observer<Downstream> observer argument and returns an Observer<Upstream>:

fun <T> doOnEmpty(action: Action): ObservableOperator<T, T> {
    return ObservableOperator { observer ->
        object : DisposableObserver<T>() {
            var empty = true

            override fun onComplete() {
                if (empty) {
                    try {
                        action.run()
                    } catch (e: Exception) {
                        onError(e)
                        return
                    }
                }
                observer.onComplete()
            }

            override fun onNext(t: T) {
                this.empty = false
                observer.onNext(t)
            }

            override fun onError(e: Throwable) {
                observer.onError(e)
            }

        }
    }
}

Now lets use this ObservableOperator by calling it in the lift():

fun main() {
    Observable.range(1, 3)
        .lift(doOnEmpty(Action { println("Source 1 : Empty!!") }))
        .subscribe { println("Received: $it") }

    Observable.empty<Int>()
        .lift(doOnEmpty(Action { println("Source 2: Empty!!") }))
        .subscribe { println("Received: $it") }
} 
Received: 1
Received: 2
Received: 3
Source 2: Empty!!

Note: when creating custom operators, sharing states between subscriptions should be avoided unless it is really the wanted behavior.

There are a couple of rules in the Observable contract that must be followed, breaking them can have unintended consequences downstream :

Also, the event calls can be manipulated and mixed as needed:

fun main() {
    Observable.range(1, 3)
        .lift(toImmutableList())
        .subscribe { println("Received: $it") }

    Observable.empty<Int>()
        .lift(toImmutableList())
        .subscribe { println("Received: $it") }
}

fun <T> toImmutableList(): ObservableOperator<List<T>, T> {
    return ObservableOperator { observer ->
        object : DisposableObserver<T>() {
            val mutableList = mutableListOf<T>()

            override fun onNext(t: T) {
                this.mutableList.add(t)
            }

            override fun onError(e: Throwable) {
                observer.onError(e)
            }

            override fun onComplete() {
                observer.onNext(mutableList.toList())
                observer.onComplete()
            }
        }
    }
}
Received: [1, 2, 3]
Received: []

FlowableOperator

FlowableOperator is implemented in a similar manner to ObservableOperator, example:

fun main() {
    Flowable.range(1, 3)
        .lift(toImmutableList())
        .subscribe { println("Received: $it") }

    Flowable.empty<Int>()
        .lift(toImmutableList())
        .subscribe { println("Received: $it") }
}

private fun <T> toImmutableList(): FlowableOperator<List<T>, T> {
    return FlowableOperator { observer ->
        object : DisposableSubscriber<T>() {
            val mutableList = mutableListOf<T>()

            override fun onNext(t: T) {
                this.mutableList.add(t)
            }

            override fun onError(e: Throwable) {
                observer.onError(e)
            }

            override fun onComplete() {
                observer.onNext(mutableList.toList())
                observer.onComplete()
            }
        }
    }
}
Received: [1, 2, 3]
Received: []

The Subscriber passed via apply() (the lambda) receives events for the downstream, and the implemented Subscriber receives events from the upstream, which it relays to the downstream.

Singles, Maybes, and Completables

There are Transformer and operator counterparts for Single, Maybe, and Completable:


Implementing operators is something to be conservative about and only to be pursued when all other options have been exhausted. It may be worthwhile to explore the RxJava2-Extras and RxJava2Extensions libraries for additional operators beyond what RxJava provides.

Mouaad Aallam

Mouaad Aallam

Computer Software Engineer

rss facebook twitter github youtube mail spotify instagram linkedin google pinterest medium vimeo gitlab docker