How does rxjava’s observable “iteration” work?

I started playing with RX Java and reactfx, and I became very fascinated But I have dozens of questions and I keep studying the answers

One thing I observe (not a pun) is, of course, lazy execution In the following exploratory code, I noticed that nothing was executed until merge Subscribe (PET – > system. Out. Println (PET)) is called But what fascinates me is when I subscribe to the second subscriber merge When subscribe (PET – > system.out.println ("feed" PET)), it starts iteration again

What I want to understand is the behavior of iterations It doesn't seem like a Java 8 stream that can only be used once It goes through each string line by line and publishes it as the value at that time? Before any previously published subscriber receives new users of these items, if they are new?

public class RxTest {

    public static void main(String[] args) {

        Observable<String> dogs = Observable.from(ImmutableList.of("Dasher","Rex"))
                .filter(dog -> dog.matches("D.*"));

        Observable<String> cats = Observable.from(ImmutableList.of("Tabby","Grumpy Cat","Meowmers","Peanut"));

        Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey"));

        Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets);

        merge.subscribe(pet -> System.out.println(pet));


        merge.subscribe(pet -> System.out.println("Feed " + pet));

    }
}

Solution

The observable < T > represents a monad, a linked operation, rather than the execution of the operation itself It's descriptive language, not commands you're used to To perform an operation, you Subscribe() to it Every time you subscribe to a new execution flow, you create it from scratch Unless you use Subscribeon() or Observeon() specifies the thread change, otherwise do not confuse the flow with the thread, because the subscription will be executed synchronously You can link the new element to any existing operation / monad / observable to add new behavior, such as changing threads, filtering, accumulation, transformation, etc If your observability is an expensive operation, you can prevent it from being used if you do not want to repeat it on each subscription Cache () for entertainment

To make any asynchronous / synchronous observable < use Toblocking() changes its type to blockingobservable < T > instead of. Subscribe() which contains Foreach() is a new method that performs the operation of each result, or is mandatory first()

Observability is a good tool because they are mainly deterministic (the same input always produces the same output unless you do something wrong), reusable (can be sent as part of the command / policy pattern), and most ignore consent because they should not rely on a common country (that is, do something wrong) Blocking observables is good. If you try to convert an observable library into an imperative language, or just perform an observable operation, you have 100% confidence that it is well managed

Building your application around these principles is a paradigm change, and I can't really answer that

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>