RX Java – the process on the background thread is observable
I'm using RX Android for streaming In my real use case, I get a list from the server (using retrofit) I'm using the scheduler to finish work on the background thread and get the final emission on the Android UI (main) thread
This is good for network calls, but I realized that my operator did not use the background thread after the network call, but called on the main thread
myService.fetchSomeIntegersFromServer() .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .filter(integer -> { System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); return true; }) .subscribe(integer1 -> {});
How do I ensure that all operations are performed on the background thread?
Solution
TL; Dr: move observeon (androidschedulers. Mainthread()) is under filter (...)
Subscribeon (...) is used to specify the thread that observable will start running Subsequent calls to subscribeon will be ignored
Therefore, if you want to write the following, you will be in schedulers Perform all operations on newthread():
myService.fetchSomeIntegersFromServer() .subscribeOn(Schedulers.newThread()) .filter(integer -> { System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); return true; }) .subscribe(integer1 -> { doSomething(integer1); });
Now, this is certainly not what you want: you want to do something on the main thread That's what was observed All operations after observeon are performed on the scheduler Therefore, in your example, the filter executes on the main thread
Instead, move the observation to before the subscription:
myService.fetchSomeIntegersFromServer() .subscribeOn(Schedulers.newThread()) .filter(integer -> { System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); return true; }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer1 -> { doSomething(integer1) });
The filter will now occur on the "new thread" and perform dosomething on the main thread
To further, you can use observeon multiple times:
myService.fetchSomeIntegersFromServer() .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.computation()) .filter(integer -> { System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); return true; }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer1 -> { doSomething(integer1) });
In this case, the grab will occur on the new thread, filter the computing thread, and execute dosomething on the main thread
Checkout reactivex – subscribeon operator is an official document