Могу ли я в @RabbitConsumer выяснить, были ли какие-либо сообщения предварительно выбраны для этого потребителя - PullRequest
0 голосов
/ 27 мая 2019

Мне нужно знать, есть ли еще сообщения, приходящие для этого потребителя.

Сейчас я считаю сообщения в очереди.Но это дает мне только то, что осталось в очереди, а не то, что было предварительно выбрано.

@RabbitListener(queues = QUEUENAME)
    public void recieve(Message message, Channel channel) throws IOException {
   long messagesOnQueue = channel.messageCount(QUEUENAME);
   if(messagesOnQueue>1) {
     //add message to list
   }
   else {
      //save the list
   }
}

Было бы действительно здорово, если бы был способ определить, были ли сообщения предварительно выбраны для этого потребителя.Это возможно?Если я смогу получить этот счет, то мне все равно, есть ли в очереди сообщения.

Получив предложения от Гэри, я изменил реализацию на эту, и она работает.При ручном подтверждении сообщения это должно быть сделано на том же канале, на котором вы получили сообщение.Но вы можете сохранить ссылку на нее, если она вам понадобится в другом потоке.В ваш весенний загрузочный файл application.yml добавьте этот

spring:
  rabbitmq:
    listener:
      direct:
        prefetch: 200
      simple:
        prefetch: 200
        acknowledgeMode: MANUAL

код от потребителя.

//The list we build and save in one transaction
    private Set<PayloadDto> unhandledPayloads = new HashSet<>();
    private long latestTag = 0L;
    private Channel latestChannel;

    @RabbitListener(queues = QUEUE_NAME, id = "consumerId")
    public void recieve(Message message, Channel channel) throws IOException {
        PayloadDto payloadDto = parse(message.getBody());

        unhandledPayloads.add(payloadDto);
        latestTag = message.getMessageProperties().getDeliveryTag();
        latestChannel = channel;
        if (unhandledPayloads.size() > UNHANDLED_PAYLOADS_LIMIT) {
            service.createOrUpdate(unhandledPayloads);
            queue.clear();
            channel.basicAck(latestTag, true);
        }
    }


    @EventListener(condition = "event.listenerId == 'consumerId'")
    public void onApplicationEvent(ListenerContainerIdleEvent event) {
        if(!queue.isEmpty()) {
            service.createOrUpdate(unhandledPayloads);
            queue.clear();
            latestChannel.basicAck(latestTag, true);
        }

    }

Причина, по которой мы пытаемся создать список перед его сохранением, заключается в возможностисделать пакетную вставку, чтобы она работала быстрее.

1 Ответ

1 голос
/ 27 мая 2019

В настоящее время нет, но добавить функцию не составит труда.Откройте вопрос GitHub, чтобы запросить его.Однако я не уверен, насколько это было бы полезно.Если в очереди все еще есть сообщения, при использовании предварительно выбранных сообщений будет получено другое.

...