Мне нужно знать, есть ли еще сообщения, приходящие для этого потребителя.
Сейчас я считаю сообщения в очереди.Но это дает мне только то, что осталось в очереди, а не то, что было предварительно выбрано.
@RabbitListener(queues = QUEUENAME)
public void recieve(Message message, Channel channel) throws IOException {
long messagesOnQueue = channel.messageCount(QUEUENAME);
if(messagesOnQueue>1) {
//add message to list
}
else {
//save the list
}
}
Было бы действительно здорово, если бы был способ определить, были ли сообщения предварительно выбраны для этого потребителя.Это возможно?Если я смогу получить этот счет, то мне все равно, есть ли в очереди сообщения.
Получив предложения от Гэри, я изменил реализацию на эту, и она работает.При ручном подтверждении сообщения это должно быть сделано на том же канале, на котором вы получили сообщение.Но вы можете сохранить ссылку на нее, если она вам понадобится в другом потоке.В ваш весенний загрузочный файл application.yml добавьте этот
spring:
rabbitmq:
listener:
direct:
prefetch: 200
simple:
prefetch: 200
acknowledgeMode: MANUAL
код от потребителя.
//The list we build and save in one transaction
private Set<PayloadDto> unhandledPayloads = new HashSet<>();
private long latestTag = 0L;
private Channel latestChannel;
@RabbitListener(queues = QUEUE_NAME, id = "consumerId")
public void recieve(Message message, Channel channel) throws IOException {
PayloadDto payloadDto = parse(message.getBody());
unhandledPayloads.add(payloadDto);
latestTag = message.getMessageProperties().getDeliveryTag();
latestChannel = channel;
if (unhandledPayloads.size() > UNHANDLED_PAYLOADS_LIMIT) {
service.createOrUpdate(unhandledPayloads);
queue.clear();
channel.basicAck(latestTag, true);
}
}
@EventListener(condition = "event.listenerId == 'consumerId'")
public void onApplicationEvent(ListenerContainerIdleEvent event) {
if(!queue.isEmpty()) {
service.createOrUpdate(unhandledPayloads);
queue.clear();
latestChannel.basicAck(latestTag, true);
}
}
Причина, по которой мы пытаемся создать список перед его сохранением, заключается в возможностисделать пакетную вставку, чтобы она работала быстрее.