Как обрабатывать исключения, возникающие при подписке на процессоры в Project Reactor - PullRequest
0 голосов
/ 03 мая 2019

Рассмотрим следующий тест:

@Test
void test() {
    DirectProcessor<Object> objectTopicProcessor = DirectProcessor.create();

    Runnable r = mock(Runnable.class);

    objectTopicProcessor.subscribe(next -> {throw new RuntimeException("eee");});
    objectTopicProcessor.subscribe(next -> r.run());

    assertThrows(RuntimeException.class, () -> objectTopicProcessor.onNext("")); // exception is thrown

    verify(r).run(); // it's not run
}

Представьте, что я создаю API, в котором я предоставляю процессор клиенту. Когда у кого-то есть несколько подписок, и одна из них выдает исключение, другие вызовы не выполняются. Кроме того, исключение распространяется и выбрасывается из objectTopicProcessor.onNext(""). Я бы хотел предотвратить такое поведение.

Я знаю, что клиент может заключить свой код в try-catch внутри подписки, но есть ли другой способ? Иногда, например, может произойти NullPointer или клиент может забыть о проверке исключения. Для API также неудобно заставлять клиентов пытаться перехватить все исключения.

Каковы лучшие стратегии для обработки таких случаев?

1 Ответ

0 голосов
/ 09 мая 2019

В этом примере код, переданный методу subscribe, по умолчанию выполняется в главном потоке.Сначала он встречает исключение и сразу завершается неудачей, не выполняя второй subscribe блок.
Для достижения параллелизма используйте метод .publishOn(scheduler):

@Test
void test() {
    DirectProcessor<Object> processor = DirectProcessor.create();
    Flux<Object> flux = processor.publishOn(Schedulers.parallel());
    Runnable r = mock(Runnable.class);
    flux.subscribe(next -> {throw new RuntimeException("eee");});
    flux.subscribe(next -> r.run());

    processor.onNext(""); // onNext no longer throws an exception

    verify(r, timeout(1000)).run();
}
...