ActiveMQ: как обрабатывать отказы брокера при использовании временных очередей - PullRequest
52 голосов
/ 22 июня 2011

В моих приложениях JMS мы используем временные очереди в Producers, чтобы иметь возможность получать ответы от пользовательских приложений.

Я сталкиваюсь с той же проблемой на моем конце, как упомянуто в этой теме: http://activemq.2283324.n4.nabble.com/jira-Created-AMQ-3336-Temporary-Destination-errors-on-H-A-failover-in-broker-network-with-Failover-tt-td3551034.html#a3612738

Всякий раз, когда я перезапускал произвольного посредника в своей сети, я получал много ошибок, подобных этой, в журнале моего приложения-потребителя при попытке отправить ответ во временную очередь:

javax.jms.InvalidDestinationException:
  Cannot publish to a deleted Destination: temp-queue://ID:...

Тогда я увидел ответ Гэри, в котором предлагалось использовать

jms.watchTopicAdvisories=false

в качестве параметра url на клиенте brokerURL. Я быстро изменил URL своего клиентского брокера с помощью этого дополнительного параметра. Однако теперь я вижу подобные ошибки, когда перезагружаю своих брокеров в сети для этого тестирования отработки отказа:

javax.jms.JMSException: 
  The destination temp-queue:
    //ID:client.host-65070-1308610734958-2:1:1 does not exist.

Я использую версию ActiveMQ 5.5. И URL моего клиентского брокера выглядит так:

failover:(tcp://amq-host1:61616,tcp://amq-host2.tred.aol.com:61616,tcp://amq-host3:61616,tcp://amq-host4:61616)?jms.useAsyncSend=true&timeout=5000&jms.watchTopicAdvisories=false

Кроме того, вот мой configm activemq для одного из 4 брокеров: amq1.xml

Может кто-то здесь, пожалуйста, посмотрите на эту проблему и предложите мне, какую ошибку я делаю в этой настройке.

Обновление:

Чтобы уточнить, как я делаю запрос-ответ в моем коде:

  1. Я уже использую назначение для каждого производителя (то есть временную очередь) и задаю его в заголовке для каждого сообщения.
  2. Я уже отправляю уникальный идентификатор корреляции для каждого сообщения в заголовке JMSCorrelationID.
  3. Насколько я знаю, даже Camel и Spring также используют временную очередь для механизма запроса-ответа. Единственное отличие состоит в том, что реализация Spring JMS создает и уничтожает временную очередь для каждого сообщения, тогда как я создаю временную очередь на время существования производителя. Эта временная очередь уничтожается при закрытии приложения клиента (производителя) или посредником AMQ, когда он понимает, что к этой временной очереди не подключен активный производитель.
  4. Я уже устанавливаю срок действия каждого сообщения на стороне источника, чтобы сообщение не задерживалось в очереди слишком долго (60 секунд).

Ответы [ 2 ]

25 голосов
/ 26 января 2012

Существует атрибут посредника, org.apache.activemq.broker.BrokerService # cacheTempDestination, который должен помочь в случае сбоя: case. Установите для этого параметра значение true в конфигурации xml, и временное назначение не будет немедленно удалено при отключении клиента. Быстрое переключение при сбое: повторное подключение сможет произвести и / или потребить из временной очереди снова.

Существует задание таймера, основанное на timeBeforePurgeTempDestination (по умолчанию 5 секунд), которое обрабатывает удаление кэша.

Одно замечание: я не вижу никаких тестов в activemq-core, которые бы использовали этот атрибут, поэтому я не могу дать вам никаких гарантий по этому.

9 голосов
/ 25 января 2012

Временные очереди создаются на посреднике, к которому подключается запрашивающий (производитель) в вашем сценарии запрос-ответ. Они создаются из javax.jms.Session, поэтому при отключении сеанса либо из-за отключения клиента, либо из-за сбоя / сбоя брокера эти очереди окончательно исчезают. Ни один из других брокеров не поймет, что имеется в виду, когда один из ваших потребителей пытается ответить на эти очереди; отсюда ваше исключение.

Это требует архитектурного изменения мышления, предполагающего, что вы хотите справиться с отказоустойчивостью и сохранить все свои сообщения. Вот общий способ, которым вы могли бы атаковать проблему:

  1. Ваши заголовки ответа должны ссылаться на очередь, специфичную для процесса запроса: например, queue:response.<client id>. Идентификатор клиента может быть стандартным именем, если у вас ограниченное количество клиентов, или UUID, если у вас их много.
  2. Исходящее сообщение должно устанавливать идентификатор корреляции (просто строка, которая позволяет связать запрос с ответом - запросчики в конце концов могут сделать более одного запроса одновременно). Это устанавливается в заголовке JMSCorrelationID и должно быть скопировано из запроса в ответное сообщение.
  3. Запрашивающему необходимо настроить прослушиватель в этой очереди, который будет возвращать тело сообщения запрашивающему потоку на основе этого идентификатора корреляции. Для этого нужно написать многопоточный код, так как вам нужно будет вручную управлять чем-то вроде карты идентификаторов корреляции с исходными потоками (возможно, через Futures).

Этот подход аналогичен Apache Camel для запрос-ответ через обмен сообщениями .

Одна вещь, о которой следует помнить, это то, что очередь не исчезнет, ​​когда это сделает клиент, поэтому вы должны установить время жизни ответного сообщения, чтобы оно удалялось из посредника, если оно не было использовано, в противном случае Вы получите отставание от неиспользованных сообщений. Вам также потребуется настроить стратегию очереди недоставленных писем, чтобы автоматически отбрасывать просроченные сообщения .

...