RX Java – retry the same project after the error

I'm trying to build a robust processing pipeline using RX - Java, but I have a problem

Here is an example:

public static void main(String[] args) {
    AtomicInteger div = new AtomicInteger(-1);
    Observable.just(1,1,1).map(item -> 1 / div.getAndIncrement())
            .retry().subscribe(item -> System.out.println(item));
}

The output in this case is 4 items, because the non stream observability is replayed, but this is irrelevant, so ignore it for simplicity I added a comment showing the key points of calculation and re subscription to achieve the results:

-1 // 1 / -1
// 1/0 (error) - resubscribes to observable
1 // 1 / 1
0 // 1 / 2
0 // 1 / 3

This happens because the retry operator (as all retry operators) causes a re subscription after delivering an error notification

My expected output is:

-1 // 1 / -1
// 1/0 (error) - resubscribe but resume erroneous item (1)
1 // 1 / 1
0 // 1 / 0

When delivering an error notification, the re subscription process should include the error item in the stream (retry on the same item) – because the error is external and not embedded in the processed item (so reprocessing makes sense)

This is a case of some external errors (such as database connection). I hope those unprocessed items will be delayed and reprocessed I know that you can re subscribe using the standard retry operator, but all these operations will discard the wrong items

I also consider wrapping all my processing in try catch. I suspect that errors are possible, but I have added template code to my processing code, which I don't want to do

So my question is: is there a standard way to retry failed projects?

What I've considered doing (untested):

someSubject.flatMap(
    item-> Observable.just(item)
        .doOnError(err -> someSubject.onNext(item))).onErrorX...

And suppress mistakes

But this seems unnatural and expensive in my use cases (create an observable for each project)

Is there an operator or combination of operators that can cause retries to pass the wrong item back to the beginning of observable without "breaking" or wrapping the item in a different observable object?

This is also the way I'm used to using async retry

Solution

This usually does not apply to rxjava If the processing of an element fails, there is no built - in way to recover from that location The best thing you can do is try to catch the problematic function callback and try again manually The second best thing is to use flatmap. The calculation that may have problems is the internal observable that can be retried separately:

source.flatMap(v ->
    Observable.just(v).map(v -> v / counter.getAndIncrement()).retry()
)
The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>