RxJava: Collection Operators

- 5 mins

It’s possible to consider Collection operators as a reducing operators since they consolidate emissions into a single one. Collection operators will accumulate all emissions into a collection (List, map, set…).

toList()

For a given Observable<T>, this operator will collect incoming emissions into a List<T> and then push it as a single emission (Single<List<T>>):

fun main() {
    Observable.just(1, 2, 3, 4)
        .toList() // Receives first emission at 1 sec
        .subscribe { l -> println("Received: $l") }
} 
Received: [1, 2, 3, 4]

It’s possible to specify the List<T> implementation:

fun main() {
    Observable.just(1, 2, 3, 4)
        .toList { CopyOnWriteArrayList<Int>() }
        .subscribe { l -> println("Received: ${l.javaClass}") }
}
Received: class java.util.concurrent.CopyOnWriteArrayList

toSortedList()

This operator will collect the emissions into a List that sorts the items naturally based on their Comparator implementation:

fun main() {
    Observable.just("Beta", "Alpha", "Delta", "Gamma", "Zeta", "Eta")
        .toSortedList()
        .subscribe { l -> println("Received: $l") }
}
Received: [Alpha, Beta, Delta, Eta, Gamma, Zeta]

It’s possible to provide a Comparator as an argument to apply a different sorting logic.

toMap()

This operator will collect emissions into Map<K,T> for a given Observable<T>, where K is the key type derived of a lambda Function<T,K> :

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
        .toMap { s -> s[0] }
        .subscribe { m -> println("Received: $m") }
}
Received: {A=Alpha, B=Beta, D=Delta, E=Epsilon, G=Gamma}

It’s possible to yield a different value other than the emission to associate with the key by providing a second lambda argument that maps each emission to a different value:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
        .toMap({ s -> s[0] }, { s -> s.length })
        .subscribe { m -> println("Received: $m") }
}
Received: {A=5, B=4, D=5, E=7, G=5, Z=4}

By default, toMap() will use HashMap. It’s possible to provide a third lambda argument that provides a different map implementation :

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
        .toMap({ s -> s[0] }, { s -> s.length }, { ConcurrentHashMap() })
        .subscribe { m -> println("Received: ${m.javaClass}") }
}
Received: class java.util.concurrent.ConcurrentHashMap

toMultiMap()

For the operator toMap() , if a key maps to multiple emissions, the last emission for that key is going to replace subsequent ones:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
        .toMap { s -> s.length }
        .subscribe { m -> println("Received: $m") }
}
Received: {4=Zeta, 5=Delta, 7=Epsilon}

However, it’s possible to map to the same key multiple values using toMultimap():

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
        .toMultimap { s -> s.length }
        .subscribe { m -> println("Received: $m") }
}
Received: {4=[Beta, Zeta], 5=[Alpha, Gamma, Delta], 7=[Epsilon]}

collect()

When no collector has what is needed, it’s possible to use the collect() operator to specify a different type to collect items into:

fun main() {
    Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
        .collect({ HashSet<String>() }, { s, v -> s.add(v) })
        .subscribe { s -> println("Received: ${s.javaClass} : $s") }
}
Received: class java.util.HashSet : [Gamma, Zeta, Delta, Alpha, Epsilon, Beta]

When putting emissions into a mutable object seed, it’s better to use collect() instead of reduce().

Mouaad Aallam

Mouaad Aallam

Software Engineer

rss facebook twitter bsky github youtube mail spotify instagram linkedin google pinterest medium vimeo mastodon gitlab docker