Exponential backoff in rxjava

I have an API that accepts an observable that triggers an event

I want to return an observable. If an Internet connection is detected, a value will be sent out every defaultdelay second. If there is no connection, numberoffailedattempts ^ 2 times will be delayed

I tried a bunch of different styles. The biggest problem I encountered was that when I tried again, I could only evaluate the observable situation once:

Observable
    .interval(defaultDelay,TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .repeatWhen((observable) ->
         observable.concatMap(repeatObservable -> {
             if(internetConnectionDetector.isInternetConnected()){
                 consecutiveRetries = 0;
                 return observable;
             } else {
                 consecutiveRetries++;
                 int backoffDelay = (int)Math.pow(consecutiveRetries,2);
                 return observable.delay(backoffDelay,TimeUnit.SECONDS);
                }
         }).onBackpressureDrop())
    .onBackpressureDrop();

Is there any way to do what I want to do? I found a related problem (I can't find it at present), but the method used doesn't seem to be suitable for dynamic values

Solution

There are two errors in your code:

>In order to repeat some observable sequence, the sequence must be limited That is, instead of spacing, you'd better use something like just or from callable, as I did in the following example. > From the internal function of repeatwhen, you need to return a new delay observable source, so you must return observable Timer () instead of observable delay().

Work code:

public void testRepeat() throws InterruptedException {
    logger.info("test start");

    int DEFAULT_DELAY = 100; // ms
    int ADDITIONAL_DELAY = 100; // ms
    AtomicInteger generator = new AtomicInteger(0);
    AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive

    Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
            .repeatWhen(counts -> {
                AtomicInteger retryCounter = new AtomicInteger(0);
                return counts.flatMap(c -> {
                    int retry = 0;
                    if (connectionAlive.get()) {
                        retryCounter.set(0); // reset counter
                    } else {
                        retry = retryCounter.incrementAndGet();
                    }
                    int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry,2);
                    logger.info("retry={},additionalDelay={}ms",retry,additionalDelay);
                    return Observable.timer(DEFAULT_DELAY + additionalDelay,TimeUnit.MILLISECONDS);
                });
            })
            .subscribe(v -> logger.info("got {}",v));

    Thread.sleep(220);
    logger.info("connection dropped");
    connectionAlive.set(false);
    Thread.sleep(2000);
    logger.info("connection is back alive");
    connectionAlive.set(true);
    Thread.sleep(2000);
    subscription.dispose();
    logger.info("test complete");
}

See the detailed article about repeatwhen here

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>