Я использую io.projectreactor 3 (ядро реактора 3.2.6.RELEASE), и я заметил некоторые несоответствия, касающиеся обработки ошибок.К сожалению, официальная документация не дает достаточно подробностей для решения моих проблем.
У меня есть следующие 4 фрагмента.В некоторых случаях исключение будет игнорироваться, а в других случаях оно будет добавлено.Как на самом деле генерировать и потреблять исключения?
Фрагмент 1
В этом случае исключение будет проигнорировано, и main () завершится без получения исключения.
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.push(sink -> {
sink.next(1);
sink.next(2);
}).doOnNext(e -> {
throw new RuntimeException("HELLO WORLD");
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
Вывод:
DONE
Фрагмент 2
Аналогичен примеру выше, за исключением того, что мы не используем Flux.pushно флюс. просто.Main () получит исключение.
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.just(
1
).doOnNext(e -> {
throw new RuntimeException("HELLO WORLD");
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
Выход:
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
at Scratch.lambda$main$1(scratch_15.java:10)
...
Фрагмент 3
Мы сообщаем об исключении, вызывая sink.error.Main () не получит исключение.
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.push(sink -> {
sink.next(1);
sink.next(2);
sink.error(new RuntimeException("HELLO WORLD"));
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
Вывод:
1
2
DONE
Фрагмент 4
Мы генерируем исключение напрямую.Main () получит исключение.
import reactor.core.publisher.Flux;
class Scratch {
public static void main(String[] args) throws Throwable {
Flux.push(sink -> {
sink.next(1);
sink.next(2);
throw new RuntimeException("HELLO WORLD");
}).subscribe(System.out::println, e -> {
throw new RuntimeException(e);
});
System.out.println("DONE");
}
}
Выход
1
2
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: HELLO WORLD
at Scratch.lambda$main$1(scratch_15.java:10)
...
Как правильно обрабатывать исключение при работе с реактивным ядром?Единственный надежный способ, по-видимому, вообще не использовать функцию обратного вызова с ошибкой, а вместо этого использовать объемный поток подписаться с помощью try / catch.Но в этом случае я всегда получаю UnsupportedOperationException
вместо исходного исключения, а затем мне нужно использовать Exceptions.isErrorCallbackNotImplemented
, чтобы проверить, происходит ли оно из реактивного, извлечь вложенное исключение и затем выбросить его.
Это можно сделатьконечно, но это должно быть сделано последовательно на каждом месте, где мы используем Flux, на который подписаны.Это не выглядит хорошо для меня.Что мне здесь не хватает?