Как добиться реактивной обработки сообщений в Spring Cloud Stream? Я прочитал о Spring Cloud Function и о том, что я должен использовать их для реактивной обработки, поэтому я создал пример один:
@Bean
public Consumer<Flux<Message<Loan>>> loanProcess() {
return loanMessages ->
loanMessages
.flatMap(loanMessage -> Mono.fromCallable(() -> {
if (loanMessage.getPayload().getStatus() == null) {
log.error("Empty status");
throw new RuntimeException("Loan status is empty");
}
return "Good";
}))
.doOnError(throwable -> log.error("Exception occurred: {}", throwable))
.subscribe(status -> log.info("Message processed correctly: {}", status));
}
Впоследствии я начал думать, в чем разница между вышеуказанной функцией и классом с помощью @StreamListener и использование типов Reactor:
@StreamListener(Sink.INPUT)
public void loanReceived(Message<Loan> message) {
Mono.just(message)
.flatMap(loanMessage -> Mono.fromCallable(() -> {
if (loanMessage.getPayload().getStatus() == null) {
log.error("Empty status");
throw new RuntimeException("Loan status is empty");
}
log.info("Correct message");
return "Correct message received";
}))
.doOnError(throwable -> log.error("Exception occurred: {}", throwable.getClass()))
.subscribe(status -> log.info("Message processed correctly: {}", status));
}
Кроме того, в Spring Webflux я понимаю, что в netty есть несколько потоков, которые обрабатывают запросы (выполняются в событии l oop). Однако я не могу найти документацию о том, как работает модель потоков в Spring Cloud Stream.