Exponential backoff in rxjava
I have an API that accepts an observable that triggers an event
I want to return an observable. If an Internet connection is detected, a value will be sent out every defaultdelay second. If there is no connection, numberoffailedattempts ^ 2 times will be delayed
I tried a bunch of different styles. The biggest problem I encountered was that when I tried again, I could only evaluate the observable situation once:
Observable .interval(defaultDelay,TimeUnit.MILLISECONDS) .observeOn(Schedulers.io()) .repeatWhen((observable) -> observable.concatMap(repeatObservable -> { if(internetConnectionDetector.isInternetConnected()){ consecutiveRetries = 0; return observable; } else { consecutiveRetries++; int backoffDelay = (int)Math.pow(consecutiveRetries,2); return observable.delay(backoffDelay,TimeUnit.SECONDS); } }).onBackpressureDrop()) .onBackpressureDrop();
Is there any way to do what I want to do? I found a related problem (I can't find it at present), but the method used doesn't seem to be suitable for dynamic values
Solution
There are two errors in your code:
>In order to repeat some observable sequence, the sequence must be limited That is, instead of spacing, you'd better use something like just or from callable, as I did in the following example. > From the internal function of repeatwhen, you need to return a new delay observable source, so you must return observable Timer () instead of observable delay().
Work code:
public void testRepeat() throws InterruptedException { logger.info("test start"); int DEFAULT_DELAY = 100; // ms int ADDITIONAL_DELAY = 100; // ms AtomicInteger generator = new AtomicInteger(0); AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive Disposable subscription = Observable.fromCallable(generator::incrementAndGet) .repeatWhen(counts -> { AtomicInteger retryCounter = new AtomicInteger(0); return counts.flatMap(c -> { int retry = 0; if (connectionAlive.get()) { retryCounter.set(0); // reset counter } else { retry = retryCounter.incrementAndGet(); } int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry,2); logger.info("retry={},additionalDelay={}ms",retry,additionalDelay); return Observable.timer(DEFAULT_DELAY + additionalDelay,TimeUnit.MILLISECONDS); }); }) .subscribe(v -> logger.info("got {}",v)); Thread.sleep(220); logger.info("connection dropped"); connectionAlive.set(false); Thread.sleep(2000); logger.info("connection is back alive"); connectionAlive.set(true); Thread.sleep(2000); subscription.dispose(); logger.info("test complete"); }
See the detailed article about repeatwhen here