У меня есть 2 получателя сообщений в долговременной очереди QPID. Эти потребители сообщений создаются с использованием следующего кода (я исключил обработку исключений в приведенном ниже примере, чтобы остаться в теме):
class MessageConsumer {
private Session session;
private Queue queue;
private final MessageListener listener;
private final ConditionProvider condition; //can be used to modify the selection string if needed
public MessageConsumer(ConnectionFactory connectionFactory, String queueName, MessageListener listener, ConditionProvider condition) {
session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
queue = session.createQueue(queueName);
}
//Wakes up every minute and processes messages for the given condition
@Scheduled(fixedRateString = "60000", initialDelay = 1000)
public void listen() {
if (messageConsumer != null) {
messageConsumer.close();
}
messageConsumer = session.createConsumer(queue, condition.getCondition());
messageConsumer.setMessageListener(listener);
}
- listen () вызывается всякий раз, когда я хочу изменить условие, используемое для запроса очереди. В настоящее время я запускаю 2 экземпляра класса MessageConsumer. Предоставляемые условия выбирают сообщения в очереди, которые находятся под определенным пороговым значением и превышают его. Порог - это свойство, установленное в заголовке сообщения. Первый слушатель используется для обработки сообщений, которые не достигли определенного порога (частоты ошибок), а второй слушатель перемещает сообщения, которые достигли порога, в другую очередь. В настоящее время обе эти работы работают как положено.
Пример:
- Потребитель 1 - Дайте мне сообщения, где порог> = 0 и порог <10, и обработайте их. Затем код прослушивателя выполняет следующие действия: если они не могут быть обработаны, он увеличивает пороговое значение и помещает его обратно в очередь. </li>
- Потребитель 2 - Дайте мне сообщения с порогом> 10 и поместите их в другую очередь.
Теперь я хочу добавить дополнительную функцию, то есть принудительное удаление очереди, если я знаю, что причина, по которой сообщения не были обработаны, устранена.
Какие допустимые варианты для этого?
- Имеет ли смысл останавливать существующих потребителей? Если да, как я могу предотвратить запуск @Scheduled и перезапустить @Scheduled после опустошения очереди
- Добавление нового потребителя в ту же очередь и получение всех сообщений? Как я могу запретить существующим потребителям получать эти сообщения?
- Изменить условие для существующих потребителей, чтобы выбрать все сообщения при следующем запланированном запуске listen () и удалить все сообщения? Так или иначе, это более привлекательно для меня, так как оно заботится о первых двух вариантах. (Установите строку запроса для одного из потребителей как нечто, что никогда не будет возвращать никаких сообщений, и задайте строку запроса для другого потребителя, чтобы она возвращала все сообщения в очереди).
Любая помощь приветствуется.