How to wait for multiple nested asynchronous calls by using rxjava Android?
I'm new to RX Java. This is my situation,
>Send request a and get the list < a > back > for each a, send request AA and get AA, bind a and AA, and then > have B & BB with similar logic > do something only after all requests are completed
Example:
request(url1,callback(List<A> listA) {
for (A a : listA) {
request(url2,callback(AA aa) {
a.set(aa);
}
}
}
A and B are independent
How to build code? I also use retrofit as a web client
thank you.
Solution
OK, I think this should solve the first part of your problem:
Please note that the second call to flatmap gives two parameters - there is a flatmap version, which not only generates an observable for each input item, but also needs a second function, which compares each item in the obtained observable with the corresponding input item
Look at the third figure under this heading for an intuitive understanding:
https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#flatmap -concatmap-and-flatmapiterable
Observable<A> obeservableOfAs = retrofitClient.getListOfAs()
.flatMap(new Func1<List<A>,Observable<A>>() {
@Override
public Observable<A> call(List<A> listOfAs) {
return Observable.from(listOfAs);
}
)}
.flatMap(new Func1<A,Observable<AA>>() {
@Override
public Observable<AA> call(A someA) {
return retrofitClient.getTheAaForMyA(someA);
}
},new Func2<A,AA,A>() {
@Override
public A call(A someA,AA theAaforMyA) {
return someA.set(theAaforMyA);
}
})
...
From here on, I'm still not sure you want to continue: are you ready to subscribe to the resulting observable? In this way, you can process each as (onnext) or just wait until all are completed
Appendix: to collect all items into a single list, you can turn your "observable" < a > into observable < list < a > > using tolist()
https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#tolist
So you have:
Observable<List<A>> observableOfListOfAs = observableOfAs.toList();
If you need more fine-grained control over the construction of the list, you can also use reduce
https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#reduce
For B, just copy the whole process you use for as
You can then use zip to wait for two streams to complete:
Observable.zip(
observableOfListOfAs,observableOfListOfBs,new Func2<List<A>,List<B>,MyPairOfLists>() {
@Override
public MyPairOfLists call(List<A> as,List<B> bs) {
return new MyPairOfLists(as,bs);
}
}
)
.subscribe(new Subscriber<MyPairOfLists>() {
// onError() and onCompleted() are omitted here
@Override
public void onNext(MyPairOfLists pair) {
// Now both the as and the bs are ready to use:
List<A> as = pair.getAs();
List<B> bs = pair.getBs();
// do something here!
}
});
I think you can guess the definition of mypairoflists
