RxJava: Как поддерживать Observable живым даже после получения ошибки в onError () или ReSubscribe той же Observable - PullRequest
0 голосов
/ 20 декабря 2018

На самом деле я создал конфигурацию типа RxSearch.В котором я прикрепил EditText textChangeListener с PublishSubject.Использование событий для отправки символов в Observable, который используется в качестве входных данных для вызова API модернизации.

Проблема

Единственная проблема, с которой я сталкиваюсь, - это когда-то я получаю сообщение об ошибке API "неожиданный конец потока" внутри обратного вызова onError () из наблюдаемого.Как только я получил ошибку, Observable перестает работать.Наблюдаемый завершает работу, не может получить символы из onNext () PublishSubject.

Посмотрите на RxSearchObservable

class RxSearchObservable {
companion object {
    fun fromView(editText: EditText): Observable<String> {
        val subject = PublishSubject.create<String>()
        editText.addTextChangedListener(object : TextWatcher {
            override fun afterTextChanged(s: Editable?) {
                //subject.onComplete()
            }

            override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {
                //subject.onNext(s.toString())
            }

            override fun onTextChanged(s: CharSequence, start: Int, before: Int, count: Int) {
                if (s.isNotEmpty()) subject.onNext(s.toString())
            }
        })
        return subject
    }
}
}

Как я подписываюсь и вызываю Retrofit API, вызывая всторона SwitchMap.

 RxSearchObservable.fromView(edtToolSearch)
                    .debounce(700, TimeUnit.MILLISECONDS)
                    .distinctUntilChanged()
                    .retryWhen { t -> t.delay(3, TimeUnit.SECONDS) }
                    .switchMap { searchTerm ->
                        runOnUiThread { progressBar.visibility = View.VISIBLE }
                        apiManager.getSearchUnits(searchTerm)
                    }
                    .onErrorResumeNext(Observable.empty())
                    .subscribe({ response ->
                        Log.i("Called subscribe", ":::::::::::+++++++++++++++ GONE")
                        progressBar.visibility = View.GONE
                        if (response.isSuccessful) {
                            val units = response.body()
                            val searchedDatasets = units?.dataset
                            if (searchedDatasets?.size!! > 0) {
                                val searchAdapter = SearchAdapter(this@MapActivity, searchedDatasets, false)
                                listSearch.visibility = View.VISIBLE
                                listSearch.adapter = searchAdapter
                            } else {
                                toast("No items found !!!")
                            }
                        } else {
                            apiError = ErrorUtils.parseError(response)
                            toast(apiError.msg)
                        }
                    }, { t: Throwable? ->
                        progressBar.visibility = View.GONE
                        toast(t?.message.toString())
                    }))

Любая идея, помощь, предложение будет оценено.Заранее спасибо.

1 Ответ

0 голосов
/ 20 декабря 2018

Поток, ошибки которого прекращаются.Вы можете retry() подписку, но это должно быть сделано только условно.Может быть, с таймаутом, может быть только несколько раз, может быть, только на определенных ошибках.

В вашем случае вы должны рассмотреть обработку ошибки вызова API в switchMap.Таким образом, ошибка не достигает основного потока.

.switchMap { searchTerm ->
    runOnUiThread { progressBar.visibility = View.VISIBLE }
    apiManager.getSearchUnits(searchTerm)
          .onErrorResumeNext(Observable.empty())
}
...