Правильный способ объединения двух потоков с независимыми и равными таймаутами в RxJava - PullRequest
3 голосов
/ 01 мая 2020

Я хотел бы знать, как наилучшим образом объединить два потока, каждый из которых имеет независимые тайм-ауты одинаковой длительности в Rx Java, не вызывая ошибку сбоя.

В Rx Java, если вы запустите В следующем коде вы получите аварийное исключение:

val testObs = Single.fromCallable {
    Thread.sleep(10000)
}.subscribeOn(Schedulers.io())
        .timeout(5000L, TimeUnit.MILLISECONDS)
        .timeout(5000L, TimeUnit.MILLISECONDS)
        .test()

testObs.awaitTerminalEvent()

Это потому, что оба эти timeouts выполняются в потоке computation, который является или может быть многопоточный . Один из тайм-аутов завершится успешно и отключит поток, а второй вызовет аварийное исключение:

io.reactivex.exceptions.UndeliverableException: исключение не может быть доставлено потребителю, поскольку оно уже отменил / удалил поток или исключение не было go для начала.

Очевидно, одно очевидное решение состоит в том, чтобы не привязывать два таймаута одинаковой длительности к одному потоку.

Однако давайте представим простой класс Api, который определяет два метода:

  fun getNewMessages: Single<List<Messsage>>
  fun getUserProfileInfo: Single<Profile>

В вашем коде представьте, что у вас есть отдельные Activity s, каждый из которых вызывает эти методы независимо; у вас также есть StartupActivity, который вызывает оба этих метода, но использует Single.zip для объединения каждой из этих операций, так что он может существенно предварительно извлечь данные и настроить приложение и подготовить его для пользователя. до вызова finish() экрана запуска spla sh. Реализация каждого из этих методов использует общий сетевой класс, который применяет стандартное значение .timeout, равное 30 секундам.

К сожалению, если существует проблема с сетью, которая приводит к остановке обоих этих методов за пределами их значения по умолчанию .timeout значений, а Single.zip, который объединяет каждую из этих сетевых операций, составит sh.

. Таким образом, существует рекомендуемый шаблон для объединения нескольких сигналов, каждое из которых имеет независимые значения времени ожидания, которые могут быть равны и, следовательно, срабатывать одновременно? Если мы воспользуемся подходом «не добавлять таймауты, если вы не являетесь конечным потребителем», это приведет к большому количеству вставленных копий .timeout(30, TimeUnit.SECOND) вызовов по всему нашему коду.

Спасибо!

1 Ответ

0 голосов
/ 03 мая 2020

Исключение UndeliverableException охватывает:

java .util.concurrent.TimeoutException: источник не сигнализировал о событии в течение 5000 миллисекунд и был прерван

Вы правы насчет оператора timeout, по умолчанию он выполняется в планировщике вычислений. Объединение их в цепочку с одинаковой продолжительностью приведет к неопределенному результату c. Иногда первый вызовет TimeoutException, иногда второй.

Вы можете избежать использования оператора zip для взлома sh с помощью одного из методов onErrorX и создания специального экземпляра комбинированный объект. Пример:

long CONFIGURED_TIMEOUT = 300L;

Single<String> getNewMessages = Single.fromCallable(() -> {
    try {
        Thread.sleep(1000);
    } catch (Exception e) {
    }
    return "";
}).timeout(CONFIGURED_TIMEOUT, TimeUnit.MILLISECONDS);

Single<Long> getUserProfileInfo = Single.fromCallable(() -> {
    try {
        Thread.sleep(500);
    } catch (Exception e) {
    }
    return 1L;
}).timeout(CONFIGURED_TIMEOUT, TimeUnit.MILLISECONDS);


TestObserver<String> testObs = Single.zip(
        getNewMessages,
        getUserProfileInfo,
        (a, b) -> a + " --- " + b)
        .onErrorResumeNext(Single.just("Special item"))
        .test();

testObs
        .awaitCount(1)
        .assertNoErrors()
        .assertValue("Special item");

Или вы можете использовать Maybe для испускания empty и выполнения некоторой обработки побочных эффектов при получении ошибки:

TestObserver<String> testObs = Maybe.zip(
        getNewMessages.toMaybe(),
        getUserProfileInfo.toMaybe(),
        (a, b) -> a + " --- " + b)
        .doOnError(th -> System.out.println("Do side effect error handling"))
        .onErrorResumeNext(Maybe.empty())
        .test();

Будьте осторожны с ордер doOnError ничего не получит, если вы поставите onErrorResumeNext перед ним.

...