Mono.onErrorResume () не всегда работает должным образом - PullRequest
0 голосов
/ 19 июня 2020

Я не уверен, что-то не так с моим кодом или это ошибка в ядре реактора. Что я пытаюсь сделать, так это обработать такие ошибки, как OptimisticLockingFailureException, которые возникают, когда разные потоки пытаются сохранить одну и ту же запись / объект базы данных в один и тот же момент.

приведенный ниже пример - это просто более простая форма того, что у меня есть в коде и предполагается, что у меня есть основной поток, который содержит множество операций, и подпоток с проблемой, которая у меня есть, которая должна выполнять некоторые вызовы базы данных с использованием реактивного mon go, который представлен здесь как "processFlux ( ) "метод.

Я применяю повторную попытку с помощью onErrorResume () для вызова того же метода изнутри« errorFallBack () », и мой код выглядит следующим образом:

import reactor.core.publisher.Flux;


public class TestClass1 {

    public static void main(String[] args) {
        Flux.range(1,10)
                .doOnNext(System.out::println)
                .publish(integerFlux -> processFlux(integerFlux))
                .doOnNext(System.out::println)
                .onErrorContinue((throwable, o) -> System.out.println("on error continue!!!"))
                .subscribe(x -> sleepMillis(100));
    }

    private static void sleepMillis(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static Flux<Integer> processFlux(Flux<Integer> input) {
        return input
                .map(integer -> {
                    if (integer == 5)
                        throw new NumberFormatException();
                    return integer;
                })
                .onErrorResume(NumberFormatException.class, e -> errorFallBack(input));
    }

    private static Flux<Integer> errorFallBack(Flux<Integer> input) {
        return input
                .doOnNext(integer -> System.out.println("inside errorFallBack()"))
                .flatMap(integer -> processFlux(Flux.just(integer)));
    }
}

in метод "processFlux ()", я предполагаю, что в какой-то момент будет сгенерировано исключение, и я пытаюсь отловить его с помощью "onErrorResume ()".

Поскольку я использую "publi sh" () "в основном потоке, чтобы обернуть" processFlux () ", я ожидаю, что когда произойдет ошибка, будет вызвана именно" onErrorResume () "... t Фактическое поведение заключается в том, что вызывается именно «onErrorContinue ()» минимального потока.

это ошибка? или я должен что-то изменить в моем коде?

Я использую реактивный mon go с весенней загрузкой 2.1.13.RELEASE, весенней облачной версией Greenwich.SR5 и версией ядра реактора 3.3.6.RELEASE

...