Exponential backoff in rxjava
I have an API that accepts an observable that triggers an event
I want to return an observable. If an Internet connection is detected, a value will be sent out every defaultdelay second. If there is no connection, numberoffailedattempts ^ 2 times will be delayed
I tried a bunch of different styles. The biggest problem I encountered was that when I tried again, I could only evaluate the observable situation once:
Observable
.interval(defaultDelay,TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.repeatWhen((observable) ->
observable.concatMap(repeatObservable -> {
if(internetConnectionDetector.isInternetConnected()){
consecutiveRetries = 0;
return observable;
} else {
consecutiveRetries++;
int backoffDelay = (int)Math.pow(consecutiveRetries,2);
return observable.delay(backoffDelay,TimeUnit.SECONDS);
}
}).onBackpressureDrop())
.onBackpressureDrop();
Is there any way to do what I want to do? I found a related problem (I can't find it at present), but the method used doesn't seem to be suitable for dynamic values
Solution
There are two errors in your code:
>In order to repeat some observable sequence, the sequence must be limited That is, instead of spacing, you'd better use something like just or from callable, as I did in the following example. > From the internal function of repeatwhen, you need to return a new delay observable source, so you must return observable Timer () instead of observable delay().
Work code:
public void testRepeat() throws InterruptedException {
logger.info("test start");
int DEFAULT_DELAY = 100; // ms
int ADDITIONAL_DELAY = 100; // ms
AtomicInteger generator = new AtomicInteger(0);
AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive
Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
.repeatWhen(counts -> {
AtomicInteger retryCounter = new AtomicInteger(0);
return counts.flatMap(c -> {
int retry = 0;
if (connectionAlive.get()) {
retryCounter.set(0); // reset counter
} else {
retry = retryCounter.incrementAndGet();
}
int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry,2);
logger.info("retry={},additionalDelay={}ms",retry,additionalDelay);
return Observable.timer(DEFAULT_DELAY + additionalDelay,TimeUnit.MILLISECONDS);
});
})
.subscribe(v -> logger.info("got {}",v));
Thread.sleep(220);
logger.info("connection dropped");
connectionAlive.set(false);
Thread.sleep(2000);
logger.info("connection is back alive");
connectionAlive.set(true);
Thread.sleep(2000);
subscription.dispose();
logger.info("test complete");
}
See the detailed article about repeatwhen here
