RxJava: Subscriber

- 4 mins

To consume emissions, Flowable uses a Subscriber instead of an Observer, and return a Subscription instead of a Disposable. The Subscription can communicate upstream how many items are wanted using its request() method.

A simple way to subscribe to a Flowable is by using lambdas in the subscribe() method:

fun main() {
    Flowable.range(1, 1_000)
        .doOnNext { println("Source pushed: $it") }
        .observeOn(Schedulers.io())
        .map { intenseCalculation(it) }
        .subscribe({ i -> println("Received $i") }, Throwable::printStackTrace, { println("Done !") })

    TimeUnit.SECONDS.sleep(Long.MAX_VALUE)
}

Another way, is to pass a Subscriber as a parameter to the subscribe() method. However unlike Observer, the method request() must be called on Subscription to request emissions at the right moments. The fastest way to do this, is by calling request(Long.MAX_VALUE):

fun main() {
    val subscriber = object : Subscriber<Int> {
        override fun onSubscribe(subscription: Subscription?) {
            subscription?.request(Long.MAX_VALUE)
        }

        override fun onNext(value: Int?) {
            TimeUnit.MILLISECONDS.sleep(5)
            println("Subscriber received: $value")
        }

        override fun onError(throwable: Throwable?) {
            throwable?.printStackTrace()
        }

        override fun onComplete() {
            println("Done!")
        }
    }

    Flowable.range(1, 1_000)
        .doOnNext { println("Source pushed: $it") }
        .observeOn(Schedulers.io())
        .map { intenseCalculation(it) }
        .subscribe(subscriber)

    TimeUnit.SECONDS.sleep(Long.MAX_VALUE)
}

This means no backpressure will exist between the last operator and the Subscriber. This is usually fine since the upstream operators will constrain the flow anyway. If the goal is to establish an explicit backpressured relationship with the operator preceding the Subscriber, the method request() should be called to change the pace of emissions:

fun main() {
    val subscriber = object : Subscriber<Int> {

        var subscription: Subscription? = null
        var count = AtomicInteger(0)

        override fun onSubscribe(subscription: Subscription?) {
            this.subscription = subscription
            println("Requesting 40 items")
            subscription?.request(40)
        }

        override fun onNext(value: Int?) {
            TimeUnit.MILLISECONDS.sleep(5)
            println("Subscriber received: $value")
            if (count.incrementAndGet() % 20 == 0 && count.get() >= 40) {
                println("Requesting 20 items")
                subscription?.request(20)
            }
        }

        override fun onError(throwable: Throwable?) {
            throwable?.printStackTrace()
        }

        override fun onComplete() {
            println("Done!")
        }
    }

    Flowable.range(1, 1_000)
        .doOnNext { println("Source pushed: $it") }
        .observeOn(Schedulers.io())
        .map { intenseCalculation(it) }
        .subscribe(subscriber)

    TimeUnit.SECONDS.sleep(Long.MAX_VALUE)
}
Requesting 40 items
Source pushed: 1
Source pushed: 2
Source pushed: 3
...
Source pushed: 127
Source pushed: 128
Subscriber received: 1
Subscriber received: 2
...
Subscriber received: 39
Subscriber received: 40
Requesting 20 items
Subscriber received: 41
Subscriber received: 42
...

In the previous example the Subscriber will request 40 emissions initially and then request 20 emissions at a time after that. Note that the request() calls do not go all the way upstream, they only go to the preceding operator, which decides how to relay that request upstream.

Mouaad Aallam

Mouaad Aallam

Computer Software Engineer

rss facebook twitter github youtube mail spotify instagram linkedin google pinterest medium vimeo gitlab docker