RX Java operator; Encapsulate data streams into custom operators

Let's say I'm observing an observer in a very specific way

resultObservable = anotherObservable.filter(~Filter code~).take(15);

I want to create a custom operator that combines two predefined operators, such as filter and take Behave like this

resultObservable = anotherObservable.lift(new FilterAndTake(15));

Or

resultObservable = anotherObservable.FilterAndTake(15);

So far, I'm happy to write a specific operator that can do this I can undo that operator

However, given my current limited knowledge of Rx Java, this will involve rewriting the take and filter functions every time I need to use it in a custom operator

That's good, but I'd rather reuse the pre - existing operators maintained by the open source community and the recycling operators I created

Something else tells me that I don't know enough about operators and subscribers

Can anyone recommend a tutorial that is not an Rx Java document? I say this because, although the document explains general concepts, it isolates the concept and general context of their functions, and there is no example to inspire a more powerful application of Rx Java

So especially

I'm trying to encapsulate a custom data stream into a representative operator Does this function exist?

Solution

I don't know some special functions (or sugars) that make up the operator object But you can simply create a new operator to compose an existing operator

public class FilterAndTake<T> implements Observable.Operator<T,T> {

    private OperatorFilter<T> filter;
    private OperatorTake<T> take;

    public FilterAndTake(Func1<? super T,Boolean> predicate,int n) {
        this.filter = new OperatorFilter<T>(predicate);
        this.take = new OperatorTake<T>(n);
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> child) {
        return filter.call(take.call(child));
    }
}

You can then use it as follows:

public static void main(String[] args) {
    Observable<Integer> xs = Observable.range(1,8);

    Func1<Integer,Boolean> predicate = new Func1<Integer,Boolean>() {
        @Override
        public Boolean call(Integer x) {
            return x % 2 == 0;
        }
    };

    Action1<Integer> action = new Action1<Integer>() {
        @Override
        public void call(Integer x) {
            System.out.println("> " + x);
        }
    };

    xs.lift(new FilterAndTake<Integer>(predicate,2)).subscribe(action);
}
The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>