RxJava2 + OkHttp Невозможно доставить исключение: время ожидания сокета - PullRequest
1 голос
/ 22 марта 2019

Я вижу следующее падение на Crashlytics:

Fatal Exception: io.reactivex.exceptions.UndeliverableException: java.net.SocketTimeoutException: connect timed out
       at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.disposeAll(FlowableFlatMap.java:590)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.cancel(FlowableFlatMap.java:354)
       at io.reactivex.internal.subscriptions.SubscriptionHelper.cancel(SubscriptionHelper.java:189)
       at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.cancel(FlowableSubscribeOn.java:141)
       at io.reactivex.internal.subscriptions.SubscriptionHelper.cancel(SubscriptionHelper.java:189)
       at io.reactivex.internal.operators.flowable.FlowableCombineLatest$CombineLatestInnerSubscriber.cancel(FlowableCombineLatest.java:540)
       at io.reactivex.internal.operators.flowable.FlowableCombineLatest$CombineLatestCoordinator.cancelAll(FlowableCombineLatest.java:454)
       at io.reactivex.internal.operators.flowable.FlowableCombineLatest$CombineLatestCoordinator.cancel(FlowableCombineLatest.java:209)
       at io.reactivex.internal.subscriptions.SubscriptionHelper.cancel(SubscriptionHelper.java:189)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.dispose(FlowableFlatMap.java:690)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.innerError(FlowableFlatMap.java:602)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.onError(FlowableFlatMap.java:668)
       at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onError(BasicFuseableSubscriber.java:101)
       at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.onError(FlowableSubscribeOn.java:102)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.checkTerminate(FlowableFlatMap.java:566)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:374)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.innerError(FlowableFlatMap.java:605)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.onError(FlowableFlatMap.java:668)
       at io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.onError(SingleToFlowable.java:68)
       at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onError(ObservableElementAtSingle.java:104)
       at io.reactivex.internal.util.HalfSerializer.onError(HalfSerializer.java:133)
       at io.reactivex.internal.operators.observable.ObservableRetryWhen$RepeatWhenObserver.innerError(ObservableRetryWhen.java:132)
       at io.reactivex.internal.operators.observable.ObservableRetryWhen$RepeatWhenObserver$InnerRepeatObserver.onError(ObservableRetryWhen.java:172)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.checkTerminate(ObservableFlatMap.java:495)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drainLoop(ObservableFlatMap.java:331)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drain(ObservableFlatMap.java:323)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onError(ObservableFlatMap.java:571)
       at io.reactivex.internal.disposables.EmptyDisposable.error(EmptyDisposable.java:63)
       at io.reactivex.internal.operators.observable.ObservableError.subscribeActual(ObservableError.java:37)
       at io.reactivex.Observable.subscribe(Observable.java:11194)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
       at io.reactivex.internal.operators.observable.ObservableZip$ZipCoordinator.drain(ObservableZip.java:205)
       at io.reactivex.internal.operators.observable.ObservableZip$ZipObserver.onNext(ObservableZip.java:276)
       at io.reactivex.subjects.PublishSubject$PublishDisposable.onNext(PublishSubject.java:309)
       at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:230)
       at io.reactivex.subjects.SerializedSubject.onNext(SerializedSubject.java:104)
       at io.reactivex.internal.operators.observable.ObservableRetryWhen$RepeatWhenObserver.onError(ObservableRetryWhen.java:106)
       at io.reactivex.internal.operators.single.SingleToObservable$SingleToObservableObserver.onError(SingleToObservable.java:65)
       at io.reactivex.internal.operators.single.SingleDoOnSuccess$DoOnSuccess.onError(SingleDoOnSuccess.java:64)
       at io.reactivex.internal.operators.single.SingleMap$MapSingleObserver.onError(SingleMap.java:69)
       at io.reactivex.internal.operators.observable.ObservableSingleSingle$SingleElementObserver.onError(ObservableSingleSingle.java:95)
       at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onError(BodyObservable.java:72)
       at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:56)
       at io.reactivex.Observable.subscribe(Observable.java:11194)
       at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
       at io.reactivex.Observable.subscribe(Observable.java:11194)
       at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
       at io.reactivex.Single.subscribe(Single.java:3096)
       at io.reactivex.internal.operators.single.SingleMap.subscribeActual(SingleMap.java:34)
       at io.reactivex.Single.subscribe(Single.java:3096)
       at io.reactivex.internal.operators.single.SingleDoOnSuccess.subscribeActual(SingleDoOnSuccess.java:35)
       at io.reactivex.Single.subscribe(Single.java:3096)
       at io.reactivex.internal.operators.single.SingleToObservable.subscribeActual(SingleToObservable.java:34)
       at io.reactivex.Observable.subscribe(Observable.java:11194)
       at io.reactivex.internal.operators.observable.ObservableRetryWhen$RepeatWhenObserver.subscribeNext(ObservableRetryWhen.java:150)
       at io.reactivex.internal.operators.observable.ObservableRetryWhen.subscribeActual(ObservableRetryWhen.java:60)
       at io.reactivex.Observable.subscribe(Observable.java:11194)
       at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java:37)
       at io.reactivex.Single.subscribe(Single.java:3096)
       at io.reactivex.internal.operators.single.SingleToFlowable.subscribeActual(SingleToFlowable.java:37)
       at io.reactivex.Flowable.subscribe(Flowable.java:13234)
       at io.reactivex.Flowable.subscribe(Flowable.java:13180)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:163)
       at io.reactivex.internal.operators.flowable.FlowableFromArray$ArraySubscription.slowPath(FlowableFromArray.java:164)
       at io.reactivex.internal.operators.flowable.FlowableFromArray$BaseArraySubscription.request(FlowableFromArray.java:89)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onSubscribe(FlowableFlatMap.java:117)
       at io.reactivex.internal.operators.flowable.FlowableFromArray.subscribeActual(FlowableFromArray.java:37)
       at io.reactivex.Flowable.subscribe(Flowable.java:13234)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:53)
       at io.reactivex.Flowable.subscribe(Flowable.java:13234)
       at io.reactivex.Flowable.subscribe(Flowable.java:13180)
       at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
       at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
       at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
       at java.util.concurrent.FutureTask.run(FutureTask.java:237)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
       at java.lang.Thread.run(Thread.java:761)

Теперь согласно официальной документации это потому, что где-то в некоторой цепочке RX исключение не может быть доставлено и, следовательно, вместосокрытие этого Rx обрабатывает это, вызывая крах.

Я знаю, что мог бы просто избежать этого поведения, используя

RxJavaPlugins.setErrorHandler(e -> { });

, но я бы лучше нашел источник проблемы.Однако нигде в журнале исключений я не вижу фактический запрос API или вызов метода, который вызывает это, только трассировка стека от Rx и Okhttp / retrofit.

Мое приложение довольно большое, поэтому мне нужно просмотреть все мои репозитории, чтобы увидеть, где я мог пропустить обработку onError.

Есть ли лучший способ отладить эту проблему?

1 Ответ

1 голос
/ 25 марта 2019

Как указано в комментариях к вопросу, мне приходилось сталкиваться с подобной проблемой.Наша проблема была в нашем приложении для Android.Сетевые вызовы могут занять слишком много времени, и пользователи отправят приложение в фоновый режим.Когда это происходит, мы избавляемся от подписок.Когда происходит тайм-аут сокета, никто не прослушивает исключение, и это вызывает UndeliverableException.

Мы заменили обработчик ошибок по умолчанию на (он в kotlin, я надеюсь, что это нормально):

private object DefaultErrorHandler : Consumer<Throwable> {
  override fun accept(t: Throwable) {
    when (t) {
        is UndeliverableException -> accept(t.cause!!)
        is NullPointerException,
        is IllegalArgumentException -> Thread.currentThread().run {
            uncaughtExceptionHandler.uncaughtException(this, t)
        }
        else -> // Swallow the exception here. We logged it to Crashlytics...
    }
  }
}

val defaultErrorHandler: Consumer<Throwable> = DefaultErrorHandler

// Then on application start we would replace the error handler
RxJavaPlugins.setErrorHandler(defaultErrorHandler)

Я почти уверен, defaultErrorHandler это ужасное имя.Извините за это.

Немного объяснений.Исключения, которые мы не проглатываем, это NullPointerException и IllegalArgumentException.Они передаются обработчику неперехваченных исключений текущего потока.Мы сделали это, потому что они обычно связаны с ошибками программирования.

Мы проверяем на UndeliverableExceptions и разворачиваем их, чтобы снова выполнить через того же потребителя.Это просто для того, чтобы убедиться, что мы используем правильную логику для исключения, которое не может быть доставлено.

Все остальные исключения проглатываются и регистрируются в аварийных ситуациях для дальнейшей оценки.

Одна ключевая вещь,это работает для наших случаев использования.Возможно, вам нужно адаптировать его к вашему.Я не говорю, что это лучший способ сделать это.Это пример.Возможно, вы хотите просто игнорировать время ожидания сокета.

...