How to wait for multiple nested asynchronous calls by using rxjava Android?
I'm new to RX Java. This is my situation,
>Send request a and get the list < a > back > for each a, send request AA and get AA, bind a and AA, and then > have B & BB with similar logic > do something only after all requests are completed
Example:
request(url1,callback(List<A> listA) { for (A a : listA) { request(url2,callback(AA aa) { a.set(aa); } } }
A and B are independent
How to build code? I also use retrofit as a web client
thank you.
Solution
OK, I think this should solve the first part of your problem:
Please note that the second call to flatmap gives two parameters - there is a flatmap version, which not only generates an observable for each input item, but also needs a second function, which compares each item in the obtained observable with the corresponding input item
Look at the third figure under this heading for an intuitive understanding:
https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#flatmap -concatmap-and-flatmapiterable
Observable<A> obeservableOfAs = retrofitClient.getListOfAs() .flatMap(new Func1<List<A>,Observable<A>>() { @Override public Observable<A> call(List<A> listOfAs) { return Observable.from(listOfAs); } )} .flatMap(new Func1<A,Observable<AA>>() { @Override public Observable<AA> call(A someA) { return retrofitClient.getTheAaForMyA(someA); } },new Func2<A,AA,A>() { @Override public A call(A someA,AA theAaforMyA) { return someA.set(theAaforMyA); } }) ...
From here on, I'm still not sure you want to continue: are you ready to subscribe to the resulting observable? In this way, you can process each as (onnext) or just wait until all are completed
Appendix: to collect all items into a single list, you can turn your "observable" < a > into observable < list < a > > using tolist()
https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#tolist
So you have:
Observable<List<A>> observableOfListOfAs = observableOfAs.toList();
If you need more fine-grained control over the construction of the list, you can also use reduce
https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#reduce
For B, just copy the whole process you use for as
You can then use zip to wait for two streams to complete:
Observable.zip( observableOfListOfAs,observableOfListOfBs,new Func2<List<A>,List<B>,MyPairOfLists>() { @Override public MyPairOfLists call(List<A> as,List<B> bs) { return new MyPairOfLists(as,bs); } } ) .subscribe(new Subscriber<MyPairOfLists>() { // onError() and onCompleted() are omitted here @Override public void onNext(MyPairOfLists pair) { // Now both the as and the bs are ready to use: List<A> as = pair.getAs(); List<B> bs = pair.getBs(); // do something here! } });
I think you can guess the definition of mypairoflists