RxJava: Collection Operators
- 5 minsIt’s possible to consider Collection operators as a reducing operators since they consolidate emissions into a single one. Collection operators will accumulate all emissions into a collection (List, map, set…).
toList()
For a given Observable<T>, this operator will collect incoming emissions into a List<T> and then push it as a single emission (Single<List<T>>):
fun main() {
Observable.just(1, 2, 3, 4)
.toList() // Receives first emission at 1 sec
.subscribe { l -> println("Received: $l") }
}
Received: [1, 2, 3, 4]
It’s possible to specify the List<T> implementation:
fun main() {
Observable.just(1, 2, 3, 4)
.toList { CopyOnWriteArrayList<Int>() }
.subscribe { l -> println("Received: ${l.javaClass}") }
}
Received: class java.util.concurrent.CopyOnWriteArrayList
toSortedList()
This operator will collect the emissions into a List that sorts the items naturally based on their Comparator implementation:
fun main() {
Observable.just("Beta", "Alpha", "Delta", "Gamma", "Zeta", "Eta")
.toSortedList()
.subscribe { l -> println("Received: $l") }
}
Received: [Alpha, Beta, Delta, Eta, Gamma, Zeta]
It’s possible to provide a Comparator as an argument to apply a different sorting logic.
toMap()
This operator will collect emissions into Map<K,T> for a given Observable<T>, where K is the key type derived of a lambda Function<T,K> :
fun main() {
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
.toMap { s -> s[0] }
.subscribe { m -> println("Received: $m") }
}
Received: {A=Alpha, B=Beta, D=Delta, E=Epsilon, G=Gamma}
It’s possible to yield a different value other than the emission to associate with the key by providing a second lambda argument that maps each emission to a different value:
fun main() {
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
.toMap({ s -> s[0] }, { s -> s.length })
.subscribe { m -> println("Received: $m") }
}
Received: {A=5, B=4, D=5, E=7, G=5, Z=4}
By default, toMap() will use HashMap. It’s possible to provide a third lambda argument that provides a different map implementation :
fun main() {
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
.toMap({ s -> s[0] }, { s -> s.length }, { ConcurrentHashMap() })
.subscribe { m -> println("Received: ${m.javaClass}") }
}
Received: class java.util.concurrent.ConcurrentHashMap
toMultiMap()
For the operator toMap() , if a key maps to multiple emissions, the last emission for that key is going to replace subsequent ones:
fun main() {
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
.toMap { s -> s.length }
.subscribe { m -> println("Received: $m") }
}
Received: {4=Zeta, 5=Delta, 7=Epsilon}
However, it’s possible to map to the same key multiple values using toMultimap():
fun main() {
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
.toMultimap { s -> s.length }
.subscribe { m -> println("Received: $m") }
}
Received: {4=[Beta, Zeta], 5=[Alpha, Gamma, Delta], 7=[Epsilon]}
collect()
When no collector has what is needed, it’s possible to use the collect() operator to specify a different type to collect items into:
fun main() {
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon", "Zeta")
.collect({ HashSet<String>() }, { s, v -> s.add(v) })
.subscribe { s -> println("Received: ${s.javaClass} : $s") }
}
Received: class java.util.HashSet : [Gamma, Zeta, Delta, Alpha, Epsilon, Beta]
When putting emissions into a mutable object seed, it’s better to use collect() instead of reduce().