Извините за поздний ответ.
Во-первых, у вас есть проблема в вашем коде, и вы рассматриваете способ обработки исключения из реактивного потока.В основном вы имеете дело с декларативным обработчиком, который обрабатывается совершенно по-другому.В вашем коде метод receive
будет вызываться только один раз при запуске и инициализации.Таким образом, генерирование исключения из него совсем не равно исключению, генерируемому во время обработки потока, для которого и был разработан механизм обработки ошибок, о котором вы запрашиваете.Но.,.
Это в сторону.,.
С введением модели программирования Spring Cloud мы рассматриваем возможность одновременного переключения внимания с реактивного модуля, поскольку Spring Cloud Function уже обеспечивает поддержку модели реактивного программирования.Поэтому рассмотрим следующее:
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {
public static void main(String[] args) {
SpringApplication.run(ReactiveStreamSinkApplication.class,
"--spring.cloud.stream.function.definition=myconsumer");
}
@Bean
public Consumer<Flux<String>> myconsumer() {
return stream -> stream.subscribe(value -> {
if ("foo".equals(value)) {
throw new RuntimeException("BOOM!");
}
System.out.println("Received value: " + value);
});
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
// log the error msg
System.out.println("Handling ERROR: " + message);
}
}
Попробуйте и дайте нам знать.