Весенний реактивный поток - неожиданное отключение - PullRequest
0 голосов
/ 09 апреля 2020

Мы используем реактивные потоки Spring Cloud с RabbitMQ.

Появляется Spring Reactive Stream для подтверждения сообщения, как только оно вытягивает его из очереди. Таким образом, любые ошибки необработанных исключений, возникающие во время обработки сообщения, должны обрабатываться в приложении (которое отличается от нереактивного потока, где необработанные исключения могут быть выброшены и сообщение будет отклонено, отправив его, таким образом, в очередь недоставленных сообщений ).

Как мы должны иметь дело с внезапным завершением работы приложения, когда сообщение находится в полете?

Например:

  • Приложение отключает сообщение очереди
  • Приложение помечает сообщение как подтвержденное
  • Приложение начинает обработку сообщения
  • Приложение закрывается до завершения обработки сообщения

Когда это происходит сообщение кажется полностью потерянным, поскольку оно находится вне очереди, но приложение остановлено. Как мы можем восстановить эти сообщения?

1 Ответ

1 голос
/ 10 апреля 2020

Вам необходимо использовать ручные подтверждения и отложить подтверждение до тех пор, пока обработка не будет завершена асинхронно. Для этого вам необходимо использовать все сообщение:

    @Bean
    public Consumer<Flux<Message<String>>> async() {
        return inbound -> inbound

                ...

                .map(msg -> {
                    try {
                        msg.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
                                .basicAck(msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                    return msg.getPayload();
                })
                .subscribe(System.out::println);
    }
spring:
  cloud:
    stream:
      function.definition: async
      bindings:
        async-in-0:
          destination: testtock
          group: async
      rabbit:
        bindings:
          async-in-0:
            consumer:
              acknowledge-mode: MANUAL
              prefetch: 10

Используйте basicReject для запроса или отправки в DLQ.

...