How does rxjava’s observable “iteration” work?
I started playing with RX Java and reactfx, and I became very fascinated But I have dozens of questions and I keep studying the answers
One thing I observe (not a pun) is, of course, lazy execution In the following exploratory code, I noticed that nothing was executed until merge Subscribe (PET – > system. Out. Println (PET)) is called But what fascinates me is when I subscribe to the second subscriber merge When subscribe (PET – > system.out.println ("feed" PET)), it starts iteration again
What I want to understand is the behavior of iterations It doesn't seem like a Java 8 stream that can only be used once It goes through each string line by line and publishes it as the value at that time? Before any previously published subscriber receives new users of these items, if they are new?
public class RxTest { public static void main(String[] args) { Observable<String> dogs = Observable.from(ImmutableList.of("Dasher","Rex")) .filter(dog -> dog.matches("D.*")); Observable<String> cats = Observable.from(ImmutableList.of("Tabby","Grumpy Cat","Meowmers","Peanut")); Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey")); Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets); merge.subscribe(pet -> System.out.println(pet)); merge.subscribe(pet -> System.out.println("Feed " + pet)); } }
Solution
The observable < T > represents a monad, a linked operation, rather than the execution of the operation itself It's descriptive language, not commands you're used to To perform an operation, you Subscribe() to it Every time you subscribe to a new execution flow, you create it from scratch Unless you use Subscribeon() or Observeon() specifies the thread change, otherwise do not confuse the flow with the thread, because the subscription will be executed synchronously You can link the new element to any existing operation / monad / observable to add new behavior, such as changing threads, filtering, accumulation, transformation, etc If your observability is an expensive operation, you can prevent it from being used if you do not want to repeat it on each subscription Cache () for entertainment
To make any asynchronous / synchronous observable < use Toblocking() changes its type to blockingobservable < T > instead of. Subscribe() which contains Foreach() is a new method that performs the operation of each result, or is mandatory first()
Observability is a good tool because they are mainly deterministic (the same input always produces the same output unless you do something wrong), reusable (can be sent as part of the command / policy pattern), and most ignore consent because they should not rely on a common country (that is, do something wrong) Blocking observables is good. If you try to convert an observable library into an imperative language, or just perform an observable operation, you have 100% confidence that it is well managed
Building your application around these principles is a paradigm change, and I can't really answer that