Talk about exceptions and handling methods in rxjava2

preface

As we all know, when an exception is thrown in a chain call in rxjava2, if there is no corresponding consumer to handle the exception, the exception will be thrown into the virtual machine. The direct manifestation on Android is crash and program crash.

Subscription mode

Before we talk about exception handling, let's take a look at the observable subscription method subscribe() in rxjava2 and several common subscription methods:

// 1
subscribe()
// 2
Disposable subscribe(Consumer<? super T> onNext)
// 3
Disposable subscribe(Consumer<? super T> onNext,Consumer<? super Throwable> onError)
// 4
Disposable subscribe(Consumer<? super T> onNext,Consumer<? super Throwable> onError,Action onComplete)
// 5
Disposable subscribe(Consumer<? super T> onNext,Action onComplete,Consumer<? super Disposable> onSubscribe)
// 6
void subscribe(Observer<? super T> observer)

Several methods that have no parameters and take consumer as parameter call the fifth method internally by supplementing the default parameters, while the sixth method is called internally by wrapping the parameters into observer through lambdaobserver

 public final Disposable subscribe(Consumer<? super T> onNext,Consumer<? super Disposable> onSubscribe) {
 ObjectHelper.requireNonNull(onNext,"onNext is null");
 ObjectHelper.requireNonNull(onError,"onError is null");
 ObjectHelper.requireNonNull(onComplete,"onComplete is null");
 ObjectHelper.requireNonNull(onSubscribe,"onSubscribe is null");

 LambdaObserver<T> ls = new LambdaObserver<T>(onNext,onError,onComplete,onSubscribe);

 subscribe(ls);

 return ls;
 }

Therefore, there is no difference between using the consumer parameter and the observer parameter for subscription except that the callback source is different. However, because of this difference, there will be differences in the processing results when exceptions occur

exception handling

We simulate exceptions in the following ways:

1. Exception thrown in observer onnext (thread switching)

 apiService.newJsonKeyData()
  .doOnSubscribe { t -> compositeDisposable.add(t) }
  .compose(RxScheduler.sync()) // 封装的线程切换
  .subscribe(object : Observer<List<ZooData>> {
  override fun onComplete() {

  }

  override fun onSubscribe(d: Disposable) {

  }

  override fun onNext(t: List<ZooData>) {
  throw RuntimeException("runtime exception")
  }

  override fun onError(e: Throwable) {
  Log.d("error",e.message)
  }

  })

Result: onerror will not be triggered and app crashes

2. Exception thrown in observer onnext (thread not switched)

  Observable.create<String> {
   it.onNext("ssss")
   }
    .subscribe(object : Observer<String> {
    override fun onComplete() {

    }

    override fun onSubscribe(d: Disposable) {

    }

    override fun onNext(t: String) {
     Log.d("result::",t)
     throw RuntimeException("run llllll")
    }

    override fun onError(e: Throwable) {
     Log.e("sss","sss",e)
    }

    })

Result: onerror will be triggered and app does not crash

3. Exception thrown in observer map operator

  apiService.newJsonKeyData()
   .doOnSubscribe { t -> compositeDisposable.add(t) }
   .map {
   throw RuntimeException("runtime exception")
   }
   .compose(RxScheduler.sync())
   .subscribe(object : Observer<List<ZooData>> {
   override fun onComplete() {

   }

   override fun onSubscribe(d: Disposable) {

   }

   override fun onNext(t: List<ZooData>) {

   }

   override fun onError(e: Throwable) {
    Log.d("error",e.message)
   }

   })

Result: the onerror of observer will be triggered, and the app does not crash

4. Exception thrown in consumer onnext

  apiService.newJsonKeyData()
   .doOnSubscribe { t -> compositeDisposable.add(t) }
   .compose(RxScheduler.sync())
   .subscribe({
   throw RuntimeException("messsasassssssssssssssssssssssssssssssssssssss")
   },{
   Log.d("Error",it.message)
   })

Result a: an errorconsumer triggered the errorconsumer, but the app did not crash

 apiService.newJsonKeyData()
   .doOnSubscribe { t -> compositeDisposable.add(t) }
   .compose(RxScheduler.sync())
   .subscribe {
   throw RuntimeException("messsasassssssssssssssssssssssssssssssssssssss")
   }

Result B: no errorconsumer, APP crashes

So why are these different situations? Let's find out from the source code.

Crash and non crash of consumer subscription mode

Subscribe() passes in consumer type parameters. Finally, in observable, the passed in parameters will be converted to lambdaobserver, and then subscribe (lambdaobserver) will be called to subscribe. Expand lambdaobserver: (mainly see the processing in onnext and onerror methods)

		.
		.
		.
		 @Override
 public void onNext(T t) {
 if (!isDisposed()) {
  try {
  onNext.accept(t);
  } catch (Throwable e) {
  Exceptions.throwIfFatal(e);
  get().dispose();
  onError(e);
  }
 }
 }

 @Override
 public void onError(Throwable t) {
 if (!isDisposed()) {
  lazySet(DisposableHelper.DISPOSED);
  try {
  onError.accept(t);
  } catch (Throwable e) {
  Exceptions.throwIfFatal(e);
  RxJavaPlugins.onError(new CompositeException(t,e));
  }
 } else {
  RxJavaPlugins.onError(t);
 }
 }
		.
		.
		.

The apply () method corresponding to consumer is invoked in onNext and try catch is carried out. Therefore, when we work in the consumer, the exception thrown will be caught and trigger the onerror of lambdaobserver. In onerror, if the subscription is not cancelled and errorconsumer's apply() execution is normal, the event flow can be completed normally. Otherwise, rxjavaplugins.onerror (T) will be called. You can see from here that when errorconsumer is not passed in during subscription, observable will specify onerrormissingconsumer as the default errorconsumer, and throw onerrornotimplementedexception when an exception occurs.

RxJavaPlugins.onError(t)

According to the above analysis, it is found that the exception will eventually flow to rxjavaplugins. Onerror (T). This method provides a global static method for rxjava2.

 public static void onError(@NonNull Throwable error) {
 Consumer<? super Throwable> f = errorHandler;

 if (error == null) {
  error = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
 } else {
  if (!isBug(error)) {
  error = new UndeliverableException(error);
  }
 }

 if (f != null) {
  try {
  f.accept(error);
  return;
  } catch (Throwable e) {
  // Exceptions.throwIfFatal(e); TODO decide
  e.printStackTrace(); // NOPMD
  uncaught(e);
  }
 }

 error.printStackTrace(); // NOPMD
 uncaught(error);
 }

Looking at its source code, it is found that when errorhandler is not empty, the exception will be consumed by it. If it is empty or a new exception is generated in the consumption process, rxjava will throw the exception to the virtual machine (which may lead to program crash). Errorhandler itself is a consumer object. We can configure it in the following ways:

 RxJavaPlugins.setErrorHandler(object : Consumer1<Throwable> {
 override fun accept(t: Throwable?) {
  TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
 }

 })

Exception thrown in data operator

Take the map operator as an example. In fact, rxjava hooks the event flow to another new observable observable map

 @CheckReturnValue
 @SchedulerSupport(SchedulerSupport.NONE)
 public final <R> Observable<R> map(Function<? super T,? extends R> mapper) {
 ObjectHelper.requireNonNull(mapper,"mapper is null");
 return RxJavaPlugins.onAssembly(new ObservableMap<T,R>(this,mapper));
 }

Enter the observablemap class and find that an internal static class mapobserver is subscribed to internally. Focus on the onnext method of mapobserver

 public void onNext(T t) {
  if (done) {
  return;
  }

  if (sourceMode != NONE) {
  downstream.onNext(null);
  return;
  }

  U v;

  try {
  v = ObjectHelper.requireNonNull(mapper.apply(t),"The mapper function returned a null value.");
  } catch (Throwable ex) {
  fail(ex);
  return;
  }
  downstream.onNext(v);
 }

In onnext, try catch mapper. Apply(), which executes the function method we implemented in the operator. Therefore, exceptions generated in data transformation operators such as map can be captured by themselves and sent to the final observer. If exceptions can be consumed in the subscription object at this time, the event flow will normally end with onerror(). If the subscription method is the consumer in the previous section, the crash situation is the analysis result in the previous section.

Exception thrown in onnext of observer

The above method 1 is a network request, which involves thread switching. Method 2 is to directly create an observable object, which does not involve thread switching. The result is that after thread switching, the observer throws an exception in the onnext() method of observer and cannot trigger onerror(), and the program crashes.

Observable.create for thread not switched

Looking at the source code of the create () method, we found that an observablecreate object was created internally, and the subscribeactual () method will be triggered when calling the subscription. In subscribeactual(), we call the subscribe() method of the observableonsubscribe object passed in during create to trigger the event flow.

 @Override
 protected void subscribeActual(Observer<? super T> observer) {

		// 对我们的观察者使用 CreateEmitter 进行包装,内部的触发方法是相对应的
 CreateEmitter<T> parent = new CreateEmitter<T>(observer);
 observer.onSubscribe(parent);

 try {
			// source 为 create 时创建的 ObservableOnSubscribe 匿名内部接口实现类
  source.subscribe(parent);
 } catch (Throwable ex) {
  Exceptions.throwIfFatal(ex);
  parent.onError(ex);
 }
 }

The subscription process in the above code is wrapped using try catch tonight. The subscription and the event flow sent after the subscription trigger are in one thread, so the exceptions in the whole event flow can be caught. (PS: you can try to use observeon() to switch the event sending thread. You will find that exceptions can no longer be caught and the program crashes)

Exception handling involving thread transformation

The observable object returned by the retrofit network request is essentially the bodyobservable generated in the rxjava2calladapter, and the onnext within the period is not exception captured. In fact, whether to capture here is not the root cause of program crash, because network requests must involve thread switching. Even if try catch is handled here, exceptions downstream of the event stream cannot be caught.

 @Override public void onNext(Response<R> response) {
 if (response.isSuccessful()) {
 observer.onNext(response.body());
 } else {
 terminated = true;
 Throwable t = new HttpException(response);
 try {
  observer.onError(t);
 } catch (Throwable inner) {
  Exceptions.throwIfFatal(inner);
  RxJavaPlugins.onError(new CompositeException(t,inner));
 }
 }
 }

Take the onnext exception thrown in the final observer as an example. To catch this exception, you must catch it in the final calling thread. That is, the Android main thread switched from. Observeon (androidschedulers. Mainthread()). Like other operators, a new set of subscription relationships is generated during thread switching, and rxjava will create a new observation object observableobserveon.

 @Override
 public void onNext(T t) {
  if (done) {
  return;
  }

  if (sourceMode != QueueDisposable.ASYNC) {
  queue.offer(t);
  }
  schedule();
 }
		.
		.
		.
		void schedule() {
  if (getAndIncrement() == 0) {
  worker.schedule(this); // 执行 ObservableObserveOn 的 run 方法
  }
 }
		.
		.
		.
	 @Override
 public void run() {
  if (outputFused) {
  drainFused();
  } else {
  drainNormal();
  }
 }
	

The worker executing the task is the worker created by the corresponding implementation subclass of the corresponding thread scheduler. Take androidschedulers. Mainthread() as an example, the scheduler implementation class is handlerscheduler, and its corresponding worker is handlerworker. Finally, the task is handed over to scheduledrunnable for execution.

 private static final class ScheduledRunnable implements Runnable,Disposable {
 private final Handler handler;
 private final Runnable delegate;

 private volatile boolean disposed; // Tracked solely for isDisposed().

 ScheduledRunnable(Handler handler,Runnable delegate) {
  this.handler = handler;
  this.delegate = delegate;
 }

 @Override
 public void run() {
  try {
  delegate.run();
  } catch (Throwable t) {
  RxJavaPlugins.onError(t);
  }
 }

 @Override
 public void dispose() {
  handler.removeCallbacks(this);
  disposed = true;
 }

 @Override
 public boolean isDisposed() {
  return disposed;
 }
 }

You will find that try catch is performed in run. However, the global exception handling rxjavaplugins. Onerror (T) is used to digest exceptions in catch;, Instead of an observer's onerror. Therefore, after switching the thread operator, the observer cannot catch the exception thrown in onnext.

Treatment scheme

Now that we know the problem, the solution to the problem is very clear.

1. Register global exception handling

 RxJavaPlugins.setErrorHandler(object : Consumer<Throwable> {
  override fun accept(t: Throwable?) {
  // do something
  }

 })

2. When the consumer is an observer, it is not completely sure that there are no exceptions. You must add an exception handling consumer

 apiService.stringData()
   .doOnSubscribe { t -> compositeDisposable.add(t) }
   .compose(RxScheduler.sync())
   .subscribe(Consumer<Boolean>{ },Consumer<Throwable> { })

3. Observer can create a baseobserver to artificially transfer the try catch in onnext to onerror. The subclass of this baseobserver is used for observation in the project.

 @Override
 public void onNext(T t) {
 try {
  onSuccess(t);
 } catch (Exception e) {
  onError(e);
 }
 data = t;
 success = true;
 }

summary

The above is the whole content of this article. I hope the content of this article has a certain reference value for your study or work. Thank you for your support.

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>