How to wait for multiple nested asynchronous calls by using rxjava Android?

I'm new to RX Java. This is my situation,

>Send request a and get the list < a > back > for each a, send request AA and get AA, bind a and AA, and then > have B & BB with similar logic > do something only after all requests are completed

Example:

request(url1,callback(List<A> listA) {
    for (A a : listA) {
        request(url2,callback(AA aa) {
            a.set(aa);
        }
    }
}

A and B are independent

How to build code? I also use retrofit as a web client

thank you.

Solution

OK, I think this should solve the first part of your problem:

Please note that the second call to flatmap gives two parameters - there is a flatmap version, which not only generates an observable for each input item, but also needs a second function, which compares each item in the obtained observable with the corresponding input item

Look at the third figure under this heading for an intuitive understanding:

https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#flatmap -concatmap-and-flatmapiterable

Observable<A> obeservableOfAs = retrofitClient.getListOfAs()
.flatMap(new Func1<List<A>,Observable<A>>() {

    @Override
    public Observable<A> call(List<A> listOfAs) {
        return Observable.from(listOfAs);
    }

)}
.flatMap(new Func1<A,Observable<AA>>() {

    @Override
    public Observable<AA> call(A someA) {
        return retrofitClient.getTheAaForMyA(someA);
    }

},new Func2<A,AA,A>() {

    @Override
    public A call(A someA,AA theAaforMyA) {
        return someA.set(theAaforMyA);
    }

})
...

From here on, I'm still not sure you want to continue: are you ready to subscribe to the resulting observable? In this way, you can process each as (onnext) or just wait until all are completed

Appendix: to collect all items into a single list, you can turn your "observable" < a > into observable < list < a > > using tolist()

https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#tolist

So you have:

Observable<List<A>> observableOfListOfAs = observableOfAs.toList();

If you need more fine-grained control over the construction of the list, you can also use reduce

https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#reduce

For B, just copy the whole process you use for as

You can then use zip to wait for two streams to complete:

Observable.zip(
    observableOfListOfAs,observableOfListOfBs,new Func2<List<A>,List<B>,MyPairOfLists>() {

        @Override
        public MyPairOfLists call(List<A> as,List<B> bs) {
            return new MyPairOfLists(as,bs);
        }
    }
)
.subscribe(new Subscriber<MyPairOfLists>() {

    // onError() and onCompleted() are omitted here

    @Override
    public void onNext(MyPairOfLists pair) {
        // Now both the as and the bs are ready to use:

        List<A> as = pair.getAs();
        List<B> bs = pair.getBs();

        // do something here!
    }
});

I think you can guess the definition of mypairoflists

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>