Spring Cloud Stream Reactive - Как сделать обработку ошибок в случае реактивного потокового конвейера? - PullRequest
0 голосов
/ 23 ноября 2018

Как сделать обработку ошибок для реактивного потока .Как

  • Обработка ошибок приложения (например: errorChannel)
  • Обработка системных ошибок (работа с DLQ, повторная обработка и т. Д.)

В текущей документации описывается только ошибкаобработка для нереактивного трубопровода.https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#_application_error_handling

Облачный поток Spring довольно прост для пользователей, предоставляя простые настройки для сценариев обработки ошибок.Было бы замечательно, если бы такие же сценарии обработки ошибок (с такими же настройками) работали и для конвейера реактивного потока .Примеры использования и соответствующие подробности конфигурации приведены ниже:

  • @ StreamListner ("errorChannel") для глобальной обработки ошибок
  • @ KafkaListener (id = "bar", themes = "реактивный поток-ошибка-тема")
  • Конфигурации для DLQ и выдача ошибочных сообщений в темы ошибок spring.cloud.stream.kafka.bindings.input.consumer.enableDlq =true spring.cloud.stream.kafka.bindings.input.consumer.dlqName = activtive-stream-error-topic

Пример из документации отлично работает с spring-cloud-stream но то же самое выдает ошибку для реактивного конвейера .Любые рекомендации в этом направлении будут очень полезны для сообщества.Заранее спасибо!

@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {

public static void main(String[] args) {
    SpringApplication.run(ReactiveStreamSinkApplication.class, args);
}

@StreamListener
public void receive(@Input(Sink.INPUT) Flux<String> inputFlux) {
    inputFlux.subscribe(System.out::println);
    throw new RuntimeException("BOOM!");
}

@StreamListener("errorChannel")
public void error(Message<?> message) {
    // log the error msg
    System.out.println("Handling ERROR: " + message);
}

@KafkaListener(id="bar", topics = "reactive-stream-error-topic")
public void error(String in) {
    System.out.println(in + " from DLQ");
}
}

1 Ответ

0 голосов
/ 29 ноября 2018

Извините за поздний ответ.

Во-первых, у вас есть проблема в вашем коде, и вы рассматриваете способ обработки исключения из реактивного потока.В основном вы имеете дело с декларативным обработчиком, который обрабатывается совершенно по-другому.В вашем коде метод receive будет вызываться только один раз при запуске и инициализации.Таким образом, генерирование исключения из него совсем не равно исключению, генерируемому во время обработки потока, для которого и был разработан механизм обработки ошибок, о котором вы запрашиваете.Но.,.

Это в сторону.,.

С введением модели программирования Spring Cloud мы рассматриваем возможность одновременного переключения внимания с реактивного модуля, поскольку Spring Cloud Function уже обеспечивает поддержку модели реактивного программирования.Поэтому рассмотрим следующее:

@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {

    public static void main(String[] args) {
        SpringApplication.run(ReactiveStreamSinkApplication.class,
        "--spring.cloud.stream.function.definition=myconsumer");
    }

    @Bean
    public Consumer<Flux<String>> myconsumer() {
        return stream -> stream.subscribe(value -> {
            if ("foo".equals(value)) {
                throw new RuntimeException("BOOM!");
            }
            System.out.println("Received value: " + value);
        });
    }

    @StreamListener("errorChannel")
    public void error(Message<?> message) {
        // log the error msg
        System.out.println("Handling ERROR: " + message);
    }
}

Попробуйте и дайте нам знать.

...