RX Java operator; Encapsulate data streams into custom operators
Let's say I'm observing an observer in a very specific way
resultObservable = anotherObservable.filter(~Filter code~).take(15);
I want to create a custom operator that combines two predefined operators, such as filter and take Behave like this
resultObservable = anotherObservable.lift(new FilterAndTake(15));
Or
resultObservable = anotherObservable.FilterAndTake(15);
So far, I'm happy to write a specific operator that can do this I can undo that operator
However, given my current limited knowledge of Rx Java, this will involve rewriting the take and filter functions every time I need to use it in a custom operator
That's good, but I'd rather reuse the pre - existing operators maintained by the open source community and the recycling operators I created
Something else tells me that I don't know enough about operators and subscribers
Can anyone recommend a tutorial that is not an Rx Java document? I say this because, although the document explains general concepts, it isolates the concept and general context of their functions, and there is no example to inspire a more powerful application of Rx Java
So especially
I'm trying to encapsulate a custom data stream into a representative operator Does this function exist?
Solution
I don't know some special functions (or sugars) that make up the operator object But you can simply create a new operator to compose an existing operator
public class FilterAndTake<T> implements Observable.Operator<T,T> { private OperatorFilter<T> filter; private OperatorTake<T> take; public FilterAndTake(Func1<? super T,Boolean> predicate,int n) { this.filter = new OperatorFilter<T>(predicate); this.take = new OperatorTake<T>(n); } @Override public Subscriber<? super T> call(final Subscriber<? super T> child) { return filter.call(take.call(child)); } }
You can then use it as follows:
public static void main(String[] args) { Observable<Integer> xs = Observable.range(1,8); Func1<Integer,Boolean> predicate = new Func1<Integer,Boolean>() { @Override public Boolean call(Integer x) { return x % 2 == 0; } }; Action1<Integer> action = new Action1<Integer>() { @Override public void call(Integer x) { System.out.println("> " + x); } }; xs.lift(new FilterAndTake<Integer>(predicate,2)).subscribe(action); }