archived

RxJava: Collection Operators

- 5 mins

It’s possible to consider Collection operators as reducing operators since they consolidate emissions into a single one. Collection operators will accumulate all emissions into a collection (List, Map, Set…).

toList()

For a given Observable<T>, this operator will collect incoming emissions into a List<T> and then push it as a single emission (Single<List<T>>):

fun main() {
    Observable.just(1, 2, 3, 4)
        .toList() // Receives first emission at 1 sec
        .subscribe { l -> println("Received: $l") }
} 
Received: [1, 2, 3, 4]

It’s possible to specify the List<T> implementation:

fun main() {
    Observable.just(1, 2, 3, 4)
        .toList { CopyOnWriteArrayList<Int>() }
        .subscribe { l -> println("Received: ${l.javaClass}") }
}
Received: class java.util.concurrent.CopyOnWriteArrayList

toSortedList()

This operator will collect the emissions into a List that sorts the items naturally based on their Comparator implementation:

fun main() {
    Observable.just("Beta", "Alpha", "Delta", "Gamma", "Zeta", "Eta")
        .toSortedList()
        .subscribe { l -> println("Received: $l") }
}
Received: [Alpha, Beta, Delta, Eta, Gamma, Zeta]

It’s possible to provide a Comparator as an argument to apply a different sorting logic.

toMap()

This operator will collect emissions into Map<K, T> for a given Observable<T>, where K is the key type derived of a lambda Function<T, K>:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
        .toMap { s -> s[0] }
        .subscribe { m -> println("Received: $m") }
}
Received: {A=Alpha, B=Beta, D=Delta, E=Epsilon, G=Gamma}

It’s possible to yield a different value other than the emission to associate with the key by providing a second lambda argument that maps each emission to a different value:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
        .toMap({ s -> s[0] }, { s -> s.length })
        .subscribe { m -> println("Received: $m") }
}
Received: {A=5, B=4, D=5, E=7, G=5, Z=4}

By default, toMap() will use HashMap. It’s possible to provide a third lambda argument that provides a different map implementation :

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
        .toMap({ s -> s[0] }, { s -> s.length }, { ConcurrentHashMap() })
        .subscribe { m -> println("Received: ${m.javaClass}") }
}
Received: class java.util.concurrent.ConcurrentHashMap

toMultiMap()

For the operator toMap(), if a key maps to multiple emissions, the last emission for that key is going to replace subsequent ones:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
        .toMap { s -> s.length }
        .subscribe { m -> println("Received: $m") }
}
Received: {4=Zeta, 5=Delta, 7=Epsilon}

However, it’s possible to map to the same key multiple values using toMultimap():

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
        .toMultimap { s -> s.length }
        .subscribe { m -> println("Received: $m") }
}
Received: {4=[Beta, Zeta], 5=[Alpha, Gamma, Delta], 7=[Epsilon]}

collect()

When no collector has what is needed, it’s possible to use the collect() operator to specify a different type to collect items into:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
        .collect({ HashSet<String>() }, { s, v -> s.add(v) })
        .subscribe { s -> println("Received: ${s.javaClass} : $s") }
}
Received: class java.util.HashSet : [Gamma, Zeta, Delta, Alpha, Epsilon, Beta]

When putting emissions into a mutable object seed, it’s better to use collect() instead of reduce().

Mouaad Aallam

Mouaad Aallam

Software Engineer

rss facebook twitter x bsky github youtube mail spotify instagram linkedin google pinterest medium vimeo mastodon gitlab docker