QPID: добавление специального потребителя сообщений для удаления необработанных сообщений в очереди - PullRequest
0 голосов
/ 05 июля 2018

У меня есть 2 получателя сообщений в долговременной очереди QPID. Эти потребители сообщений создаются с использованием следующего кода (я исключил обработку исключений в приведенном ниже примере, чтобы остаться в теме):

class MessageConsumer {
    private Session session;
    private Queue queue;
    private final MessageListener listener;
    private final ConditionProvider condition; //can be used to modify the selection string if needed

    public MessageConsumer(ConnectionFactory connectionFactory, String queueName, MessageListener listener, ConditionProvider condition) {
            session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            queue = session.createQueue(queueName);
    }
//Wakes up every minute and processes messages for the given condition
@Scheduled(fixedRateString = "60000", initialDelay = 1000)
public void listen() {
        if (messageConsumer != null) { 
            messageConsumer.close();
        }               
        messageConsumer = session.createConsumer(queue, condition.getCondition());
        messageConsumer.setMessageListener(listener);
    } 
  • listen () вызывается всякий раз, когда я хочу изменить условие, используемое для запроса очереди. В настоящее время я запускаю 2 экземпляра класса MessageConsumer. Предоставляемые условия выбирают сообщения в очереди, которые находятся под определенным пороговым значением и превышают его. Порог - это свойство, установленное в заголовке сообщения. Первый слушатель используется для обработки сообщений, которые не достигли определенного порога (частоты ошибок), а второй слушатель перемещает сообщения, которые достигли порога, в другую очередь. В настоящее время обе эти работы работают как положено.

Пример:

  • Потребитель 1 - Дайте мне сообщения, где порог> = 0 и порог <10, и обработайте их. Затем код прослушивателя выполняет следующие действия: если они не могут быть обработаны, он увеличивает пороговое значение и помещает его обратно в очередь. </li>
  • Потребитель 2 - Дайте мне сообщения с порогом> 10 и поместите их в другую очередь.

Теперь я хочу добавить дополнительную функцию, то есть принудительное удаление очереди, если я знаю, что причина, по которой сообщения не были обработаны, устранена.

Какие допустимые варианты для этого?

  • Имеет ли смысл останавливать существующих потребителей? Если да, как я могу предотвратить запуск @Scheduled и перезапустить @Scheduled после опустошения очереди
  • Добавление нового потребителя в ту же очередь и получение всех сообщений? Как я могу запретить существующим потребителям получать эти сообщения?
  • Изменить условие для существующих потребителей, чтобы выбрать все сообщения при следующем запланированном запуске listen () и удалить все сообщения? Так или иначе, это более привлекательно для меня, так как оно заботится о первых двух вариантах. (Установите строку запроса для одного из потребителей как нечто, что никогда не будет возвращать никаких сообщений, и задайте строку запроса для другого потребителя, чтобы она возвращала все сообщения в очереди).

Любая помощь приветствуется.

...