RX Java – how do I pause observable without losing the emitted items?
I have an observable that emits a tick every second:
Observable.interval(0,1,TimeUnit.SECONDS) .take(durationInSeconds + 1));
I want to pause this observable so that it stops sending numbers and resumes it as needed
There are some pitfalls:
>According to the observable Javadoc, the interval operator does not support back pressure > the rxjava wiki about backpressure has a section about callstack blocking as an alternative to flow control of back pressure:
Is there any way to pause interval observable? Or should I use some back pressure support to achieve my own 'tick' observable?
Solution
There are many ways to do this For example, you can still use interval () and maintain two other states: for example, Boolean flag "paused" and a counter
public static final Observable<Long> pausableInterval( final AtomicBoolean paused,long initial,long interval,TimeUnit unit,Scheduler scheduler) { final AtomicLong counter = new AtomicLong(); return Observable.interval(initial,interval,unit,scheduler) .filter(tick -> !paused.get()) .map(tick -> counter.getAndIncrement()); }
Then you just need to call paused somewhere Set (true / false) to pause / resume
Editor: June 4, 2016
There are some problems with the above solution If we reuse the observable instance multiple times, it will start with the value at the last unsubscribe For example:
Observable<Long> o = pausableInterval(...) List<Long> list1 = o.take(5).toList().toBlocking().single(); List<Long> list2 = o.take(5).toList().toBlocking().single();
Although LIST1 will be [0,2,3,4], List2 is actually [5,6,7,8,9] If you do not want the above behavior, you must make the observer stateless This can be achieved by the scan () operator The revised version may be as follows:
public static final Observable<Long> pausableInterval(final AtomicBoolean pause,final long initialDelay,final long period,Scheduler scheduler) { return Observable.interval(initialDelay,period,scheduler) .filter(tick->!pause.get()) .scan((acc,tick)->acc + 1); }
Alternatively, if you don't want to rely on Java 8 and Lambdas, you can do so using java 6 compatible code:
https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343 -L361