RX Java – groupby operator, items from different groups are interleaved
The following codes:
Observable .just(0,1,2,3,4,5,6,7,8,9) .doOnNext(item -> System.out.println("source emitting " + item)) .groupBy(item -> { System.out.println("groupBy called for " + item); return item % 3; }) .subscribe(observable -> { System.out.println("got observable " + observable + " for key " + observable.getKey()); observable.subscribe(item -> { System.out.println("key " + observable.getKey() + ",item " + item); }); });
It puzzles me The output I get is:
source emitting 0 groupBy called for 0 got observable rx.observables.GroupedObservable@42110406 for key 0 key 0,item 0 source emitting 1 groupBy called for 1 got observable rx.observables.GroupedObservable@1698c449 for key 1 key 1,item 1 source emitting 2 groupBy called for 2 got observable rx.observables.GroupedObservable@5ef04b5 for key 2 key 2,item 2 source emitting 3 groupBy called for 3 key 0,item 3 source emitting 4 groupBy called for 4 key 1,item 4 source emitting 5 groupBy called for 5 key 2,item 5 source emitting 6 groupBy called for 6 key 0,item 6 source emitting 7 groupBy called for 7 key 1,item 7 source emitting 8 groupBy called for 8 key 2,item 8 source emitting 9 groupBy called for 9 key 0,item 9
Therefore, in the top-level subscription method, I get three observable objects from groupedobservable as expected Then, one by one, I subscribed to the group observable - what I don't understand here:
Why is the original item still issued in the original sequence (i.e. 0,3...) instead of 0,9... For key 0, followed by 1,7 is key 1, and key 2 is followed by 2,8;
I think I understand how to create a group:
1. 0 is emitted,the key function is called and it gets 0 2. it is checked if an observable for 0 exists,it doesn't,so a new one is created and emitted,and then it emits 0 3. the same happens for source items 1 and 2 as they both create new groups,and observables with key 1 and 2 are emitted,and they emit 1 and 2 correspondingly 4. source item 3 is emitted,the key function is called and it gets 0 5. it is checked if an observable for 0 exists,it does -> no new grouped observable is created nor emitted,but 3 is emitted by the already existing observable 6. etc. until the source sequence is drained
It seems that although I have obtained observable measurements of groups one by one, their emissions are staggered to some extent How did this happen?
Solution
You have answered your question You operate on the project flow in the order they are issued So, each one is emitted, it's passed down from the operator chain, and you'll see the output you show here
The alternative output you expect requires the chain to wait until the source stops issuing items for all groups Suppose you have observable just(0,0). Then you expect (0,0), (1,4), (2) as your output group What if you have 4 unlimited traffic? Your subscribers will never receive 0,3 from the first group
You can create the behavior you are looking for The tolist operator caches the output until the source is complete, and then passes list < R > To subscribers:
.subscribe(observable -> { System.out.println("got observable " + observable + " for key " + observable.getKey()); observable.toList().subscribe(items -> { // items is a List<Integer> System.out.println("key " + observable.getKey() + ",items " + items); }); });