RX Java – how do I pause observable without losing the emitted items?

I have an observable that emits a tick every second:

Observable.interval(0,1,TimeUnit.SECONDS)
    .take(durationInSeconds + 1));

I want to pause this observable so that it stops sending numbers and resumes it as needed

There are some pitfalls:

>According to the observable Javadoc, the interval operator does not support back pressure > the rxjava wiki about backpressure has a section about callstack blocking as an alternative to flow control of back pressure:

Is there any way to pause interval observable? Or should I use some back pressure support to achieve my own 'tick' observable?

Solution

There are many ways to do this For example, you can still use interval () and maintain two other states: for example, Boolean flag "paused" and a counter

public static final Observable<Long> pausableInterval(
  final AtomicBoolean paused,long initial,long interval,TimeUnit unit,Scheduler scheduler) {

  final AtomicLong counter = new AtomicLong();
  return Observable.interval(initial,interval,unit,scheduler)
      .filter(tick -> !paused.get())
      .map(tick -> counter.getAndIncrement()); 
}

Then you just need to call paused somewhere Set (true / false) to pause / resume

Editor: June 4, 2016

There are some problems with the above solution If we reuse the observable instance multiple times, it will start with the value at the last unsubscribe For example:

Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();

Although LIST1 will be [0,2,3,4], List2 is actually [5,6,7,8,9] If you do not want the above behavior, you must make the observer stateless This can be achieved by the scan () operator The revised version may be as follows:

public static final Observable<Long> pausableInterval(final AtomicBoolean pause,final long initialDelay,final long period,Scheduler scheduler) {

    return Observable.interval(initialDelay,period,scheduler)
        .filter(tick->!pause.get())
        .scan((acc,tick)->acc + 1);
  }

Alternatively, if you don't want to rely on Java 8 and Lambdas, you can do so using java 6 compatible code:

https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343 -L361

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>