Сбой плоской карты RxJava, даже если присутствует ошибка onError - PullRequest
0 голосов
/ 08 октября 2018

Я пытаюсь отладить сбой на Android, который происходит, когда устройство не имеет сетевого подключения.Авария происходит только иногда (~ 50% времени), и я не уверен, почему.Если кто-то может объяснить, что происходит, или дать решение, я буду очень признателен.

Я использую плоскую карту для параллельной выборки изображений, и я подписываюсь на блок onError ().Несмотря на это, приложение по-прежнему аварийно завершает работу.

Вот код (я пометил строку, на которой он падает, с помощью «Crash here:»):

fun downloadImages() {
    var progress = 0
    var maxProgress = 0

    downloadModel.postValue(
            Resource.inProgress(getString(R.string.downloading_offline_data)))

    disposables.add(Observable
            .create<Pair<String, File>> { emitter ->
                val files = readFileDirs()

                for (file in files) {
                  val urlList = readUrlListFrom(file)
                  maxProgress += urlList.size

                  for (url in urlList) {
                    emitter.onNext(url to file)
                  }
                }
                emitter.onComplete()
            }
            .flatMap { (url, file) ->
                downloadImage(url, file)
                        .subscribeOn(Schedulers.computation())
                        .andThen(Observable.fromCallable {
                            ++progress
                        })
            }
            .doOnDispose {
                downloadModel.postValue(Resource.idle())
            }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({
                Log.d(TAG, "Done downloading $it")

                if (it % 100 == 0) {
                    refreshCacheSize()
                }

                downloadModel.postValue(Resource.inProgress(
                        getString(R.string.downloading_offline_data),
                        it,
                        maxProgress
                ))
            }, { error ->
                val errorId = when (error) {
                    is SocketTimeoutException ->
                        NETWORK_TIMEOUT_ERROR
                    is InterruptedIOException -> {
                        // do nothing
                        return@subscribe
                    }
                    is UnknownHostException ->
                        NETWORK_ERROR
                    is IOException ->
                        UNKNOWN_ERROR
                    is UnsupportedServerVersionException ->
                        UNSUPPORTED_VERSION_ERROR
                    else -> {
                        UNKNOWN_ERROR
                    }
                }
                downloadModel.postValue(Resource.error(errorId))
            }, {
                refreshCacheSize()

                downloadModel.postValue(Resource.success())
            })
    )
}

private fun downloadImage(url: String, outFile: File): Completable {
    val outFileTmp = File(outFile.canonicalPath + ".tmp")
    return Completable
            .create { emitter ->
                val request = Request.Builder()
                        .url(url)
                        .build()
                try {
Crash here:          val response = Client.get().newCall(request).execute()
                    val sink: BufferedSink
                    try {
                        Log.d(TAG, "Writing to file")
                        val body = response.body()
                        if (body != null) {
                            sink = Okio.buffer(Okio.sink(outFileTmp))
                            sink.writeAll(body.source())
                            sink.close()
                        } else {
                            if (!emitter.isDisposed) {
                                emitter.onError(DownloadImageException("Body was null."))
                            }
                        }
                    } finally {
                        response.body()?.close()
                    }

                    outFileTmp.renameTo(outFile)

                    emitter.onComplete()
                } catch (e: Exception) {
                    Log.e(TAG, "Could not save splash", e)
                    emitter.tryOnError(e)
                }
            }
            .doOnDispose {
                outFileTmp.delete()
            }
            .doOnError {
                outFileTmp.delete()
            }
}

Трассировка стека:

2018-10-08 02:50:07.055 21984-22263/com.ggstudios.lolcatalyst E/AndroidRuntime: FATAL EXCEPTION: RxComputationThreadPool-2
    Process: com.ggstudios.lolcatalyst, PID: 21984
    java.lang.Throwable: Unable to resolve host "mywebsite.com": No address associated with hostname
        at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:157)
        at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:105)
        at java.net.InetAddress.getAllByName(InetAddress.java:1154)
        at okhttp3.Dns$1.lookup(Dns.java:40)
        at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:185)
        at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:149)
        at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:84)
        at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:214)
        at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135)
        at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
        at okhttp3.RealCall.execute(RealCall.java:77)
        at com.ggstudios.lolcatalyst.offline.OfflineDownloadViewModel$downloadImage$1.subscribe(OfflineDownloadViewModel.kt:372)
        at io.reactivex.internal.operators.completable.CompletableCreate.subscribeActual(CompletableCreate.java:39)
        at io.reactivex.Completable.subscribe(Completable.java:1918)
        at hu.akarnokd.rxjava2.debug.CompletableOnAssembly.subscribeActual(CompletableOnAssembly.java:39)
        at io.reactivex.Completable.subscribe(Completable.java:1918)
        at io.reactivex.internal.operators.completable.CompletableSubscribeOn$SubscribeOnObserver.run(CompletableSubscribeOn.java:64)
        at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
        at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
        at java.lang.Thread.run(Thread.java:764)
     Caused by: java.lang.Throwable: android_getaddrinfo failed: EAI_NODATA (No address associated with hostname)
        at libcore.io.Linux.android_getaddrinfo(Native Method)
        at libcore.io.BlockGuardOs.android_getaddrinfo(BlockGuardOs.java:172)
        at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:137)
        at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:105) 
        at java.net.InetAddress.getAllByName(InetAddress.java:1154) 
        at okhttp3.Dns$1.lookup(Dns.java:40) 
        at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:185) 
        at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:149) 
        at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:84) 
        at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:214) 
        at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135) 
        at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114) 
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) 
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) 
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) 
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200) 
        at okhttp3.RealCall.execute(RealCall.java:77) 
        at com.ggstudios.lolcatalyst.offline.OfflineDownloadViewModel$downloadImage$1.subscribe(OfflineDownloadViewModel.kt:372) 
        at io.reactivex.internal.operators.completable.CompletableCreate.subscribeActual(CompletableCreate.java:39) 
        at io.reactivex.Completable.subscribe(Completable.java:1918) 
        at hu.akarnokd.rxjava2.debug.CompletableOnAssembly.subscribeActual(CompletableOnAssembly.java:39) 
        at io.reactivex.Completable.subscribe(Completable.java:1918) 
        at io.reactivex.internal.operators.completable.CompletableSubscribeOn$SubscribeOnObserver.run(CompletableSubscribeOn.java:64) 
        at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38) 
        at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26) 
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) 
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) 
        at java.lang.Thread.run(Thread.java:764) 

1 Ответ

0 голосов
/ 11 октября 2018

Я выяснил причину и некоторые возможные решения.

Давайте сначала начнем с причины.

Мой код можно упростить так: Observable # 1запускает n других Observables, которые выполняются в других потоках через flatMap().

. Для простоты давайте предположим, что все Observables выполняются в своем собственном потоке.Затем, когда одна наблюдаемая выдает ошибку, ошибка распространяется на все другие наблюдаемые и удаляет их.

Проблема заключается в том, что это распространение не синхронизировано (по-видимому, из-за соображений производительности).Из-за этого tryOnError() может пройти проверку isDisposed(), затем утилизироваться и затем вызвать ошибку.Это объясняет, почему исключение все еще возникает, даже если используется tryOnError().

Некоторые решения

В конце концов, после некоторых исследований и дискуссий, я пришел к двум основным решениям.

Один из способов - преобразовать все ошибки в результаты в Observables через onErrorReturn().Это решение работает, но может не подойти, если вы хотите остановить выполнение при появлении ошибки.

Другой способ - зарегистрировать глобальный обработчик ошибок и просто игнорировать эти ошибки.

Я пошел с последним решением, однако я не полностью удовлетворен.Мне бы очень хотелось, чтобы был способ зарегистрировать локальный обработчик ошибок.Например.есть метод на Observables, чтобы игнорировать ошибки, испускаемые после утилизации Observable.

Ну хорошо.

...