Rxjava rx.exceptions.missingbackpressureexception with filter and mapping
I've just started using rxjava / rxandroid, and I have some questions to know how to deal with back pressure correctly
I have a file scanner observable to scan directories and send files. These files should be processed as quickly as possible without skipping any files
Invalid Sign
Unfortunately, I got rx.exceptions.missingbackpressureexception error. So I read the back pressure. If I understand it correctly, the only options with less loss are buffer and window
I tried onbackpressurebuffer(), buffer() and window(). Although all onbackpressurex() commands seem to have no effect, buffer() groups items into list < File >. My problem is:
>How do I filter these groups? Filters (< list < File > >, Boolean) are meaningless... > How can I achieve observable back pressure processing in my file scanner so that I can wait until my pipeline / operator / subscriber has capacity? > Is it a good habit to use, for example, conversion projects? Map () to XYZ entities and store them in a separate list instead of active subscribers, but as a side effect of operators?
Some feedback or even tips will be helpful and commendable
resolvent:
I think I found a solution to the problem: this code is invalid:
Observable<File> task = scanner.getProcessDirectoryTask(mountPoint);
Subscription _subscription = task
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onBackpressureBuffer(10000)
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getAbsolutePath().endsWith("xyz");
}
})
.buffer(100)
.subscribe(new Observer<List<File>>() { /*whatever you want to do*/ }
But this code works properly:
Observable<File> task = scanner.getProcessDirectoryTask(mountPoint);
Subscription _subscription = task
.onBackpressureBuffer(10000)
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getAbsolutePath().endsWith("xyz");
}
})
.buffer(100)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<File>>() { /*whatever you want to do*/ }
Therefore, it seems that there is a huge difference between subscription order () and subscription order ()!
My third question is off topic but still open. Maybe someone can comment on it