Как работает реактивная обработка Spring Cloud Stream? - PullRequest
0 голосов
/ 03 апреля 2020

Как добиться реактивной обработки сообщений в Spring Cloud Stream? Я прочитал о Spring Cloud Function и о том, что я должен использовать их для реактивной обработки, поэтому я создал пример один:

@Bean
public Consumer<Flux<Message<Loan>>> loanProcess() {
  return loanMessages ->
      loanMessages
          .flatMap(loanMessage -> Mono.fromCallable(() -> {
            if (loanMessage.getPayload().getStatus() == null) {
              log.error("Empty status");
              throw new RuntimeException("Loan status is empty");
            }
            return "Good";
          }))
          .doOnError(throwable -> log.error("Exception occurred: {}", throwable))
          .subscribe(status -> log.info("Message processed correctly: {}", status));
}

Впоследствии я начал думать, в чем разница между вышеуказанной функцией и классом с помощью @StreamListener и использование типов Reactor:

@StreamListener(Sink.INPUT)
public void loanReceived(Message<Loan> message) {
  Mono.just(message)
      .flatMap(loanMessage -> Mono.fromCallable(() -> {
        if (loanMessage.getPayload().getStatus() == null) {
          log.error("Empty status");
          throw new RuntimeException("Loan status is empty");
        }
        log.info("Correct message");
        return "Correct message received";
      }))
      .doOnError(throwable -> log.error("Exception occurred: {}", throwable.getClass()))
      .subscribe(status -> log.info("Message processed correctly: {}", status));
}

Кроме того, в Spring Webflux я понимаю, что в netty есть несколько потоков, которые обрабатывают запросы (выполняются в событии l oop). Однако я не могу найти документацию о том, как работает модель потоков в Spring Cloud Stream.

...