Реактивный Spring Cloud Stream читает все данные из очереди, но нереактивные читает сообщения одно за другим - PullRequest
0 голосов
/ 08 октября 2018

Я заметил разницу в реактивном и нереактивном поведении Spring Cloud Stream.Когда я использую нереактивный подход, сообщения читаются одно за другим.Ниже приведен код для прослушивателя потока:

@StreamListener(Processor.INPUT)
public void receive2(String input) throws InterruptedException {
    Thread.sleep(2000);
    System.out.println(input.toUpperCase());
}

Этот код принимает сообщения одно за другим.Я вижу это на сайте управления RabbitMQ:

enter image description here

После перезапуска приложение продолжает получать сообщения с точки, в которой оно завершило работу до перезапуска.Но при использовании службы прослушивания реактивного потока все сообщения немедленно извлекаются из очереди.После перезапуска приложения оно не продолжает обрабатывать сообщения, поскольку очередь пуста.

А вот прослушиватель реактивного потока перерезан:

@StreamListener
public void receive1(@Input(Processor.INPUT) Flux<String> input) {
    input
            .delayElements(Duration.ofSeconds(2))
            .map(String::toUpperCase)
            .doOnEach(System.out::println)
            .subscribe();
}

enter image description here

Я хочу понять, почему это происходит, и можно ли остановить чтение всей очереди перед обработкой предыдущих элементов.Это мое неправильное понимание операторов Reactor или ожидаемое поведение?Можно ли как-то указать противодавление?

Ответы [ 2 ]

0 голосов
/ 08 октября 2018

Поскольку реактив не является блокирующим;delayElements() передает работу другому потоку;это освобождает поток слушателя, который возвращается в контейнер и получает сообщение;следовательно, следующий доставляется (и задерживается);Итак, все сообщения попадают в очередь планировщика.Reactive действительно не подходит для этого случая использования, если только вы не используете ручные подтверждения, которые показаны ниже.

@StreamListener
public void receive1(@Input(Processor.INPUT) Flux<Message<String>> input) {
    input
            .doOnEach(System.out::println)
            .delayElements(Duration.ofSeconds(2))
            .map(m -> {
                return MessageBuilder.withPayload(m.getPayload().toUpperCase())
                        .copyHeaders(m.getHeaders())
                        .build();
            })
            .doOnEach(System.out::println)
            .doOnNext(m -> {
                try {
                    m.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
                        .basicAck(m.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            })
            .subscribe();
}

и

spring.cloud.stream.bindings.input.group=foo
spring.cloud.stream.rabbit.bindings.input.consumer.acknowledge-mode=manual
0 голосов
/ 08 октября 2018

Нам, вероятно, нужно взглянуть на то, как реактивная система подключается, поскольку она кажется странной.Сказав, что если вы используете последнюю версию Spring-Cloud-Stream, я бы порекомендовал небольшое изменение в вашем приложении и полагаться на поддержку Spring Cloud Function, которая в вашем случае, по сути, предоставляет альтернативную поддержку для реализации обработчиков реагирующих сообщений - https://spring.io/blog/2018/08/28/spring-cloud-stream-fishtown-m2-2-1-0-m2-release-announcement. Единственное изменение, которое вы должны внести в пример в блоге:

@Bean
public Consumer<Flux<String>> foo() {
    //
}
...