RxJava: Supressing Operators

- 8 mins

The operators that will suppress emissions that fail to meet a specified criterion are Suppressing operators.

filter()

The filter() operator accepts a lambda that qualifies each emission by mapping it to a Boolean value, and emissions with false will not go forward:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .filter { it.length != 5 }
        .subscribe { println("Received: $it") }
}
Received: Beta
Received: Epsilon

take()

This operator has two overloads. The first will take a specified number of emissions and then call onComplete() after it captures all of them:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .take(3)
        .subscribe { println("Received: $it") }
}
Received: Alpha
Received: Beta
Received: Gamma

The other overload of filter() will take emissions within a specific time duration and then call onComplete():

fun main() {
    Observable.interval(300, TimeUnit.MILLISECONDS)
        .take(1, TimeUnit.SECONDS)
        .subscribe { println("Received: $it") }

    TimeUnit.SECONDS.sleep(5)
}
Received: 0
Received: 1
Received: 2

There is also a takeLast() operator, which will take the last specified number of emissions (or time duration) before the onComplete() is called.

skip()

skip() is the opposite of take() operator. It will ignore the specified number of emissions and then emit the ones that follow:

fun main() {
    Observable.range(1, 10)
        .skip(7)
        .subscribe { println("Received: $it") }
}
Received: 8
Received: 9
Received: 10

Like the take() operator, there is also an overload accepting a time duration and a skipLast() operator.

takeWhile() & takeUntil()

takeWhile() operator is a variant of the take() operator: it takes emissions while a condition is true. Once the condition is not satisfied, onComplete() is called:

fun main() {
    Observable.range(1, 10)
        .takeWhile { it < 5 }
        .subscribe { println("Received: $it") }
}
Received: 1
Received: 2
Received: 3
Received: 4

The takeUntil() operator is similar to takeWhile(), but it accepts another Observable as a parameter. It will keep taking emissions until that other Observable pushes an emission:

fun main() {
    val observable = Observable.interval(1, TimeUnit.SECONDS)

    // Will start emissions at 300 milliseconds
    Observable.interval(300, TimeUnit.MILLISECONDS)
        .takeUntil(observable) // Receives first emission at 1 sec
        .subscribe { println("Received: $it") }

    TimeUnit.SECONDS.sleep(3)
}
Received: 0
Received: 1
Received: 2

skipWhile() & skipUntil()

This operator will keep skipping emissions while the condition is satisfied. Once the condition no longer qualifies, the emissions will start going through:

fun main() {
    Observable.range(1, 10)
        .skipWhile { it <= 95 }
        .subscribe { println("Received: $it") }
}
Received: 8
Received: 9
Received: 10

The skipUntil() operator accepts another Observable as an argument but it will keep skipping until the other Observable emits something:

fun main() {
    val observable = Observable.interval(1, TimeUnit.SECONDS)

    // Will start emissions at 300 milliseconds
    Observable.interval(300, TimeUnit.MILLISECONDS)
        .skipUntil(observable) // Receives first emission at 1 sec
        .subscribe { println("Received: $it") }

    TimeUnit.SECONDS.sleep(2)
}
Received: 3
Received: 4
Received: 5

distinct()

The distinct() operator will emit each unique emission and suppress any duplicates that follow. Equality is based on hashCode()/equals() implementation of the emitted objects:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .map{ it.length }
        .distinct()
        .subscribe { println("Received: $it") }
}
Received: 5
Received: 4
Received: 7

It’s possible to add a lambda argument that maps each emission to a key used for equality logic. This allows the emissions, but not the key, to go forward while using the key for distinct logic:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
        .distinct{ it.length }
        .subscribe { println("Received: $it") }
}
Received: Alpha
Received: Beta
Received: Epsilon

distinctUntilChanged()

This operator function will ignore duplicate consecutive emissions. All the duplicates will be ignored until a new value is emitted :

fun main() {
    Observable.just(1, 1, 1, 2, 2, 3, 3, 2, 1, 1)
        .distinctUntilChanged()
        .subscribe { println("Received: $it") }
}
Received: 1
Received: 2
Received: 3
Received: 2
Received: 1

It’s possible to provide a lambda to map the emissions and use the result as key:

fun main() {
    Observable.just("Alpha", "Beta", "Zeta", "Eta", "Gamma", "Delta")
        .distinctUntilChanged { s -> s.length }
        .subscribe { println("Received: $it") }
}
Received: Alpha
Received: Beta
Received: Eta
Received: Gamma

elementAt()

It’s possible to get a specific emission by its index specified by a Long, starting at 0. When the item is found and emitted, onComplete() will be called and dispose of the subscription.

fun main() {
    Observable.just("Alpha", "Beta", "Zeta", "Eta", "Gamma", "Delta")
        .elementAt(3)
        .subscribe { println("Received: $it") }
}
Received: Eta

There are other flavours of elementAt():

Mouaad Aallam

Mouaad Aallam

Software Engineer

rss facebook twitter bsky github youtube mail spotify instagram linkedin google pinterest medium vimeo mastodon gitlab docker