RxJava: Windowing

- 2 mins

The window() operator is almost identical to buffer(), except that it buffers into other Observables rather than collections. This results in an Observable<Observable<T>> that emits Observables. Yielded Observables can be transformed using operators like flatMap(), concatMap(), or switchMap().

Fixed-size

The simplest overload for window() accepts a count argument:

fun main() {
    Observable.range(1, 50)
        .window(8)
        .flatMapSingle { it.reduce("") { total, next -> "$total $next" } }
        .subscribe { println("Received: $it") }
}
Received:  1 2 3 4 5 6 7 8
Received:  9 10 11 12 13 14 15 16
Received:  17 18 19 20 21 22 23 24
Received:  25 26 27 28 29 30 31 32
Received:  33 34 35 36 37 38 39 40
Received:  41 42 43 44 45 46 47 48
Received:  49 50

Just like buffer(), It’s possible to provide a skip argument:

fun main() {
    Observable.range(1, 50)
        .window(8, 12) // 2nd argument is skip
        .flatMapSingle { it.reduce("") { total, next -> "$total $next" } }
        .subscribe { println("Received: $it") }
}
Received:  1 2 3 4 5 6 7 8
Received:  13 14 15 16 17 18 19 20
Received:  25 26 27 28 29 30 31 32
Received:  37 38 39 40 41 42 43 44
Received:  49 50

Time-based

It is possible to can cut-off windowed Observables at time intervals :

fun main() {
    Observable.interval(300, TimeUnit.MILLISECONDS)
        .map { (it + 1) * 300 } // map to elapsed time
        .window(1, TimeUnit.SECONDS)
        .flatMapSingle { it.reduce("") { total, next -> "$total $next" } }
        .subscribe { println("Received: $it") }

    TimeUnit.SECONDS.sleep(4)
}
Received:  300 600 900
Received:  1200 1500 1800
Received:  2100 2400 2700
Received:  3000 3300 3600 3900

It is also possible yo specify count and timeshift arguments just like buffer() operator.

Mouaad Aallam

Mouaad Aallam

Software Engineer

rss facebook twitter github youtube mail spotify instagram linkedin google pinterest medium vimeo mastodon gitlab docker