Spring Cloud Stream - Функции - Как вручную подтвердить сообщение rabbitmq? - PullRequest
1 голос
/ 14 апреля 2020

Я использую весенний облачный поток с rabbitbinder.

Используя @StreamListener, я мог бы вручную подтверждать сообщения rabbitmq, вставив Channel и deliveryTag в метод следующим образом:

 @StreamListener(target = MySink.INPUT1)
 public void listenForInput1(Message<String> message,
      @Header(AmqpHeaders.CHANNEL) Channel channel,
      @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {

    log.info(" received new message [" + message.toString() + "] ");
    channel.basicAck(deliveryTag, false);
 }

Я сейчас пытаюсь добиться того же, используя функции:

 @Bean
 public Consumer<Message<String>> sink1() {
    return message -> {
      System.out.println("******************");
      System.out.println("At Sink1");
      System.out.println("******************");
      System.out.println("Received message " + message.getPayload());
    };
  }

Как мне получить объект Channel, чтобы я мог подтвердить его с помощью deliveryTag? Я могу получить заголовки формы тега доставки. Однако я не могу получить объект канала.

1 Ответ

2 голосов
/ 14 апреля 2020

Я смог понять это:

  @Bean
  public Consumer<Message<String>> sink1() {
    return message -> {
      System.out.println("******************");
      System.out.println("At Sink1");
      System.out.println("******************");
      System.out.println("Received message " + message.getPayload());

      Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
      Long deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);

      try {
        channel.basicAck(deliveryTag, false);
      } catch (IOException e) {
        e.printStackTrace();
      }
    };
  }
...