RxJava: Transforming Operators

- 6 mins

Transforming operators are a series of operators in an Observable chain to transform emissions.

map()

The map() operator will transform a T emission for a given Observable<T> into an R emission using the provided lambda:

fun main() {
    val dateTimeFormatter = DateTimeFormatter.ofPattern("dd/MM/yyyy")
    Observable.just("01/01/2019", "31/03/2020", "07/07/2018")
        .map { LocalDate.parse(it, dateTimeFormatter) }
        .subscribe { println("Received: $it") }
}
Received: 2019-01-01
Received: 2020-03-31
Received: 2018-07-07

The map() operator in this previous example transforms a String to a LocalDate object.
The map() operator does a one-to-one conversion. To do a one-to-many conversion flatMap() or concatMap() are more appropriate.

cast()

This operator is to cast each emission to a different type:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma")
        .cast(CharSequence::class.java)
        .subscribe { println("Received: $it") }
}

The Observer will receive CharSequence emissions.

startWith()

The startWith() operator allows to insert a T emission that precedes all the other emissions in a given Observable<T>:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma")
        .startWith("GREEK ALPHABET:")
        .subscribe { println("Received: $it") }
}
Received: GREEK ALPHABET:
Received: Alpha
Received: Beta
Received: Gamma

There is too the operator startWithArray() : to start with more than one emission, it accepts vararg parameter:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma")
        .startWithArray("GREEK ALPHABET:", "---------------")
        .subscribe { println("Received: $it") }
}
Received: GREEK ALPHABET:
Received: ---------------
Received: Alpha
Received: Beta
Received: Gamma

The operators concat() and concatWith() are good to have an entire emissions of Observable to precede emissions of another Observable.

defaultIfEmpty()

The operator defaultIfEmpty() is to get a single emission if a given Observable comes out empty:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .filter { it.length < 3 }
        .defaultIfEmpty("None")
        .subscribe { println("Received: $it") }
}
Received: None

switchIfEmpty()

switchIfEmpty() specifies a different Observable to emit values from if the source Observable is empty:

fun main() {
    val altObservable = Observable.just("Zeta", "Eta", "Theta")
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .filter { it.startsWith("Z") }
        .switchIfEmpty(altObservable)
        .subscribe { i -> println("RECEIVED: $i") }
} 
Received: Zeta
Received: Eta
Received: Theta

sorted()

In case of a finite Observable<T> emitting items that implement Comparable<T>, the operator sorted() can sort the emissions. Internally, it will collect all the emissions and re-emit them sorted:

fun main() {
    Observable.just(4, 3, 1, 2, 1)
        .sorted()
        .subscribe { print("$it ") }
}
1 1 2 3 4 

It’s possible to provide a Comparator (as object or lambda) to specify the sorting criterion:

fun main() {
    Observable.just(4, 3, 1, 2, 1)
        .sorted(Comparator.reverseOrder())
        .subscribe { print("$it ") }
}
4 3 2 1 1 

delay()

delay() operator postpones emissions, holds any received emissions and delay each one for the specified time period:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma")
        .delay(3, TimeUnit.SECONDS)
        .subscribe { println("Received: $it") }

    TimeUnit.SECONDS.sleep(5)
}
Received: Alpha
Received: Beta
Received: Gamma

It’s possible to pass another Observable as argument to delay(), and it will delay emissions until that other Observable emits something.

repeat()

The repeat() operator will repeat subscription upstream after onComplete() a specified number of times. If no number is provided, it will repeat infinitely, forever re-subscribing after every onComplete()

fun main() {
    Observable.just("Alpha", "Beta", "Epsilon")
        .repeat(2)
        .subscribe { println("Received: $it") }
}
Received: Alpha
Received: Beta
Received: Epsilon
Received: Alpha
Received: Beta
Received: Epsilon

There is also a repeatUntil() operator to keep repeating until the passed Boolean supplier yields false.

scan()

The scan() operator is a rolling aggregator. _It will emit after each upstream emission the new accumulation:

fun main() {
    Observable.just(1, 1, 2, 3, 5)
        .scan { accumulator, next -> accumulator + next }
        .subscribe { println("Received: $it") }
}
Received: 1
Received: 2
Received: 4
Received: 7
Received: 12

It’s possible to provide an initial value for the first argument and aggregate into a different type than what is being emitted.

Mouaad Aallam

Mouaad Aallam

Software Engineer

rss facebook twitter github youtube mail spotify instagram linkedin google pinterest medium vimeo mastodon gitlab docker