RxJava: Transformers

- 5 mins

RxJava provides the possibility to reuse pieces of Observable or Flowable chains and combine these operators into a new operator. using ObservableTransformer and FlowableTransformer.

ObservableTransformer

Lets take the following example:

fun main() {
    // Letters
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .collect(::StringBuilder) { sb, v -> sb.append("$v ") }
        .map(StringBuilder::toString)
        .subscribe { s -> println(s) }

    // Numbers
    Observable.range(1, 15)
        .collect(::StringBuilder) { sb, v -> sb.append("$v ") }
        .map(StringBuilder::toString)
        .subscribe { s -> println(s) }
}
Alpha Beta Gamma Delta Epsilon 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 

The piece of code that is chaining of the two operators collect() and map() is the same! Enhance the code reusability using ObservableTransformer and compose():

fun main() {
    // Letters
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .compose(toString())
        .subscribe { l -> println(l) }

    // Numbers
    Observable.range(1, 15)
        .compose(toString())
        .subscribe { l -> println(l) }
}

fun <T> toString(): ObservableTransformer<T, String> {
    return ObservableTransformer { upstream ->
        upstream
            .collect(::StringBuilder) { sb, v -> sb.append("$v ") }
            .map(StringBuilder::toString)
            .toObservable()
    }
}
Alpha Beta Gamma Delta Epsilon 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 

FlowableTransformer

The FlowableTransformer isn’t much different from ObservableTransformer. Of course, it supports backpressure since it’s composed with Flowables:

fun main() {
    // Letters
    Flowable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .compose(toString())
        .subscribe { l -> println(l) }

    // Numbers
    Flowable.range(1, 15)
        .compose(toString())
        .subscribe { l -> println(l) }
}

fun <T> toString(): FlowableTransformer<T, String> {
    return FlowableTransformer { upstream ->
        upstream
            .collect(::StringBuilder) { sb, v -> sb.append("$v ") }
            .map(StringBuilder::toString)
            .toFlowable()
    }
}
Alpha Beta Gamma Delta Epsilon 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 

Shared States

When creating Transformers (and custom operators), state sharing between more than one subscription can cause unwanted behaviors and side effects.

fun main() {
    val source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .compose(withIndex())
    source.subscribe { println("Subscriber 1: $it") }
    source.subscribe { println("Subscriber 1: $it") }
}

fun <T> withIndex(): ObservableTransformer<T, IndexedValue<T>> {
    val indexer = AtomicInteger(-1) // Bad idea!
    return ObservableTransformer { upstream ->
        upstream.map { v ->
            IndexedValue(indexer.incrementAndGet(), v)
        }
    }
}

data class IndexedValue<T>(val index: Int, val value: T)
Subscriber 1: IndexedValue(index=0, value=Alpha)
Subscriber 1: IndexedValue(index=1, value=Beta)
Subscriber 1: IndexedValue(index=2, value=Gamma)
Subscriber 1: IndexedValue(index=3, value=Delta)
Subscriber 1: IndexedValue(index=4, value=Epsilon)
Subscriber 1: IndexedValue(index=5, value=Alpha) // Oops!
Subscriber 1: IndexedValue(index=6, value=Beta)
Subscriber 1: IndexedValue(index=7, value=Gamma)
Subscriber 1: IndexedValue(index=8, value=Delta)
Subscriber 1: IndexedValue(index=9, value=Epsilon)

A single instance (and state) of AtomicInteger is shared between both subscriptions. On the second subscription, instead of starting over at 0, it picks up at the index left by the earlier subscription and starts at index 5 since it ended at 4.


Note: in Kotlin, instead of using Transformers, it’s possible to leverage extension functions to add operators to Observable and Flowable types.

Mouaad Aallam

Mouaad Aallam

Software Engineer

rss facebook twitter github youtube mail spotify instagram linkedin google pinterest medium vimeo mastodon gitlab docker