RxJava: Flowable

- 10 mins

The Flowable is a backpressured variant of the Observable that tells the source to emit at a pace specified by the downstream operations. Lets replace Observable.range() with Flowable.range():

fun main() {
    Flowable.range(1, 999_999_999)
        .map(::Item)
        .observeOn(Schedulers.io())
        .subscribe {
            TimeUnit.MILLISECONDS.sleep(50)
            println("Received item ${it.id}")
        }

    TimeUnit.SECONDS.sleep(Long.MAX_VALUE)
}
Constructing Item 1
Constructing Item 2
...
Constructing Item 127
Constructing Item 128
Received item 1
Received item 2
...
Received item 95
Received item 96
Constructing Item 129
Constructing Item 130
...
Constructing Item 223
Constructing Item 224
Received item 97
Received item 98
...

Some parts of the output are omitted, but the behavior is clear: 128 emissions were immediately pushed from Flowable.range(). After that, observeOn() pushed 96 of them downstream to Subscriber (yes, not an Observer, but a Subscriber). This behavior of not having more than a certain number of emissions in the pipeline at any given time is what’s called: backpressure.

Flowables, when ?

The benefits offered from the Flowable are: leaner usage of memory and preventing MissingBackpressureException. However, the disadvantage is that it adds overhead and may not perform as quickly as an Observable. So when to use Flowable ?

  1. When dealing with over 10,000 elements and there is opportunity for the source to generate emissions in a regulated manner.
  2. When the goal is to emit from IO operations that support blocking while returning results. For example from data sources that iterate records (file lines, JDBC’s ResultSets…), or network and streaming APIs that can request a certain amount of returned results.
  3. It might be better to use Flowables when the stream is not synchronous, like when zipping and combining different streams on different threads, parallelize, or use operators such as observeOn(), interval(), and delay().

Note: in RxJava 1.0, the Observable was backpressured and was what the Flowable is in RxJava 2.0.

BackpressureException

Flowable has factories like: Flowable.range(),Flowable.just(),Flowable.fromIterable(), and Flowable.interval() . Most of these implement backpressure, and usage is generally the same as the Observable equivalent. However, let’s consider Flowable.interval(), which pushes time-based emissions at fixed time intervals. This can’t be logically backpressured because slowing down the Flowable.interval() emissions would not reflect time intervals and become misleading. For that reason, Flowable.interval() is one of those few cases that can throw MissingBackpressureException the moment downstream requests backpressure:

fun main() {
    Flowable.interval(1, TimeUnit.MILLISECONDS)
        .observeOn(Schedulers.io())
        .map { intenseCalculation(it) }
        .subscribe({ i -> println("Received item $i") }, Throwable::printStackTrace)

    TimeUnit.SECONDS.sleep(Long.MAX_VALUE)
}
Received item 0
io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
... 

A solution for this issue is to use operators such as onBackpresureDrop() or onBackPressureBuffer().

Creating a Flowable

Leveraging Flowable.create() to create a Flowable feels much like Observable.create(), but there is one critical difference: BackpressureStrategy as a second argument. This enumerable simply supports backpressure by not implementing it, caching or dropping emissions.

fun main() {
    val source = Flowable.create<Int>({ emitter ->
        for (i in 0..1_000) {
            if (emitter.isCancelled) return@create
            emitter.onNext(i)
        }
    }, BackpressureStrategy.BUFFER)

    source.observeOn(Schedulers.io())
        .subscribe { println("Received item $it") }

    TimeUnit.SECONDS.sleep(1)
}

In the previous example, Flowable.create() is used to create a Flowable, with BackpressureStrategy.BUFFER as second argument to buffer the emissions before they are backpressured.

The following are the possible BackpressureStrategy options:

Backpressure Operators

In case of a Flowable that has no backpressure implementation (including ones derived from Observable), the BackpressureStrategy can be applied using onBackpressureXXX() operators. These also provide a few additional configuration options.

onBackPressureBuffer()

The onBackPressureBuffer() takes an existing Flowable that is assumed to not have backpressure implemented and apply BackpressureStrategy.BUFFER at that point to the downstream:

fun main() {
    Flowable.interval(1, TimeUnit.MILLISECONDS)
        .onBackpressureBuffer()
        .observeOn(Schedulers.io())
        .subscribe {
            TimeUnit.MILLISECONDS.sleep(5)
            println("Received item $it")
        }

    TimeUnit.SECONDS.sleep(Long.MAX_VALUE)
}
Received item 0
Received item 1
Received item 2
Received item 3
Received item 4
...

onBackPressureBuffer() can accept a number arguments, the more common ones are:

fun main() {
    Flowable.interval(1, TimeUnit.MILLISECONDS)
        .onBackpressureBuffer(10, { println("Overflow!") }, BackpressureOverflowStrategy.DROP_LATEST)
        .observeOn(Schedulers.io())
        .subscribe {
            TimeUnit.MILLISECONDS.sleep(5)
            println("Received item $it")
        }

    TimeUnit.SECONDS.sleep(Long.MAX_VALUE)
}
...
Received item 22
Received item 23
Overflow!
Overflow!
Overflow!
Overflow!
Received item 24
Overflow!
Overflow!
Overflow!
Overflow!
Overflow!
Overflow!
Received item 25
Overflow!
Overflow!
...

onBackPressureLatest()

The operator onBackPressureLatest() will retain the latest value from the source while the downstream is busy, and until the downstream is free to process more. Any previous values emitted during this busy period will be lost:

fun main() {
    Flowable.interval(1, TimeUnit.MILLISECONDS)
        .onBackpressureLatest()
        .observeOn(Schedulers.io())
        .subscribe {
            TimeUnit.MILLISECONDS.sleep(5)
            println("Received item $it")
        }

    TimeUnit.SECONDS.sleep(Long.MAX_VALUE)
}
...
Received item 125
Received item 126
Received item 127
Received item 566
Received item 567
...

onBackPressureDrop()

The onBackpressureDrop() operator discards emissions if the downstream is too busy to process them. The operator can accept an onDrop lambda argument specifying the action to do with each dropped item.

fun main() {
    Flowable.interval(1, TimeUnit.MILLISECONDS)
        .onBackpressureDrop{ println("Drop: $it")}
        .observeOn(Schedulers.io())
        .subscribe {
            TimeUnit.MILLISECONDS.sleep(5)
            println("Received item $it")
        }

    TimeUnit.SECONDS.sleep(Long.MAX_VALUE)
}
...
Received item 19
Received item 20
Drop: 128
Received item 21
Drop: 129
Drop: 130
...

Flowable.generate()

Most of Flowable’s standard factories of and operators automatically handle backpressure. However, in case of custom sources, Flowable.create() or the onBackPressureXXX() operators are somewhat compromised in how they handle backpressure requests, caching emissions or simply dropping them is not always desirable. Flowable.generate() exists to help create backpressure, respecting sources at a nicely abstracted level.

fun main() {
    Flowable.generate<Int> { emitter -> emitter.onNext(Random.nextInt(1, 1_000)) }
        .subscribeOn(Schedulers.computation())
        .doOnNext { println("Emitting $it") }
        .observeOn(Schedulers.io())
        .subscribe {
            TimeUnit.MILLISECONDS.sleep(5)
            println("Received item $it")
        }

    TimeUnit.SECONDS.sleep(Long.MAX_VALUE)
}
Emitting 577
Emitting 597
...
Emitting 235
Emitting 70
Received item 577
Received item 597
...

Note: invoking multiple onNext() operators within Consumer<Emitter<T>> will result in IllegalStateException.

It is possible to provide a state that can act somewhat like a “seed” and maintain a state that is passed from one emission to the next:

fun main() {
    Flowable.generate<Int, AtomicInteger>(
        Callable<AtomicInteger> { AtomicInteger(1) },
        BiConsumer<AtomicInteger, Emitter<Int>> { state, emitter -> emitter.onNext(state.getAndIncrement()) }
    )
        .subscribeOn(Schedulers.computation())
        .doOnNext { println("Emitting $it") }
        .observeOn(Schedulers.io())
        .subscribe {
            TimeUnit.MILLISECONDS.sleep(5)
            println("Received item $it")
        }
    TimeUnit.SECONDS.sleep(Long.MAX_VALUE)
}

It is also possible to provide a third Consumer<? super S> disposeState argument to do any disposal operations on termination.

Flowable.generator() provides a nicely abstracted mechanism to create a source that respects backpressure, which make it preferable over Flowable.create() to avoid caching or dropping emissions.

Mouaad Aallam

Mouaad Aallam

Computer Software Engineer

rss facebook twitter github youtube mail spotify instagram linkedin google pinterest medium vimeo gitlab docker