Я заметил разницу в реактивном и нереактивном поведении Spring Cloud Stream.Когда я использую нереактивный подход, сообщения читаются одно за другим.Ниже приведен код для прослушивателя потока:
@StreamListener(Processor.INPUT)
public void receive2(String input) throws InterruptedException {
Thread.sleep(2000);
System.out.println(input.toUpperCase());
}
Этот код принимает сообщения одно за другим.Я вижу это на сайте управления RabbitMQ:
После перезапуска приложение продолжает получать сообщения с точки, в которой оно завершило работу до перезапуска.Но при использовании службы прослушивания реактивного потока все сообщения немедленно извлекаются из очереди.После перезапуска приложения оно не продолжает обрабатывать сообщения, поскольку очередь пуста.
А вот прослушиватель реактивного потока перерезан:
@StreamListener
public void receive1(@Input(Processor.INPUT) Flux<String> input) {
input
.delayElements(Duration.ofSeconds(2))
.map(String::toUpperCase)
.doOnEach(System.out::println)
.subscribe();
}
Я хочу понять, почему это происходит, и можно ли остановить чтение всей очереди перед обработкой предыдущих элементов.Это мое неправильное понимание операторов Reactor или ожидаемое поведение?Можно ли как-то указать противодавление?