Я не уверен, что-то не так с моим кодом или это ошибка в ядре реактора. Что я пытаюсь сделать, так это обработать такие ошибки, как OptimisticLockingFailureException, которые возникают, когда разные потоки пытаются сохранить одну и ту же запись / объект базы данных в один и тот же момент.
приведенный ниже пример - это просто более простая форма того, что у меня есть в коде и предполагается, что у меня есть основной поток, который содержит множество операций, и подпоток с проблемой, которая у меня есть, которая должна выполнять некоторые вызовы базы данных с использованием реактивного mon go, который представлен здесь как "processFlux ( ) "метод.
Я применяю повторную попытку с помощью onErrorResume () для вызова того же метода изнутри« errorFallBack () », и мой код выглядит следующим образом:
import reactor.core.publisher.Flux;
public class TestClass1 {
public static void main(String[] args) {
Flux.range(1,10)
.doOnNext(System.out::println)
.publish(integerFlux -> processFlux(integerFlux))
.doOnNext(System.out::println)
.onErrorContinue((throwable, o) -> System.out.println("on error continue!!!"))
.subscribe(x -> sleepMillis(100));
}
private static void sleepMillis(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Flux<Integer> processFlux(Flux<Integer> input) {
return input
.map(integer -> {
if (integer == 5)
throw new NumberFormatException();
return integer;
})
.onErrorResume(NumberFormatException.class, e -> errorFallBack(input));
}
private static Flux<Integer> errorFallBack(Flux<Integer> input) {
return input
.doOnNext(integer -> System.out.println("inside errorFallBack()"))
.flatMap(integer -> processFlux(Flux.just(integer)));
}
}
in метод "processFlux ()", я предполагаю, что в какой-то момент будет сгенерировано исключение, и я пытаюсь отловить его с помощью "onErrorResume ()".
Поскольку я использую "publi sh" () "в основном потоке, чтобы обернуть" processFlux () ", я ожидаю, что когда произойдет ошибка, будет вызвана именно" onErrorResume () "... t Фактическое поведение заключается в том, что вызывается именно «onErrorContinue ()» минимального потока.
это ошибка? или я должен что-то изменить в моем коде?
Я использую реактивный mon go с весенней загрузкой 2.1.13.RELEASE, весенней облачной версией Greenwich.SR5 и версией ядра реактора 3.3.6.RELEASE