Авто истечение срока действия сообщений в Camel - PullRequest
5 голосов
/ 15 января 2012

У меня есть система, реализующая Camel и ActiveMQ для связи между некоторыми серверами. Я хотел бы знать, существует ли способ автоматического истечения срока действия и очистки сообщений, отправленных в очередь через X промежуток времени. Поскольку исходный сервер (заполняющий очередь) не будет знать, что кто-то забирает сообщения, я не хочу, чтобы моя очередь росла до тех пор, пока она не станет настолько большой, что что-то не получится. Бонусная карма указывает на кого-то, кто может помочь и предоставить java dsl способ реализовать эту функцию.

Решение

// expire message after 2 minutes
long ttl = System.currentTimeMillis() + 120000;
// send our info one-way to the group topic
camelTemplate.sendBodyAndHeader("jms:queue:stats", ExchangePattern.InOnly, stats, "JMSExpiration", ttl);

Ответы [ 5 ]

6 голосов
/ 15 января 2012

JMS предоставляет механизм для установки срока действия сообщений.Посмотрите на следующие две ссылки

  1. setJMSExpiration (long expiration) : за сообщение
  2. ActiveMQ: как установить срок действия сообщения : за описание / за сообщение
3 голосов
/ 16 января 2012

Также помните, что часы между клиентом и брокером должны быть синхронизированы, чтобы срок действия работал правильно. Если часы не синхронизированы, то срок действия, установленный от клиента, может уже истечь, когда сообщение получено посредником. Или время клиента опережает брокера, поэтому срок действия превышает 10 секунд.

Это немного лучше, чем истечение срока действия клиента. Таким образом, AMQ предлагает плагин, чтобы исправить это, перераспределяя время, чтобы быть только на основе брокера. Смотри https://activemq.apache.org/timestampplugin.html

1 голос
/ 21 июля 2017

С нашей стороны мы решили добавить срок действия к определенным пунктам назначения, используя верблюжий маршрут, развернутый в самой службе ActiveMQ.

Единственное, что нужно сделать, - это создать файл XML, подобный следующему, с именем, например setJMSExpiration.xml:

<beans xmlns="http://www.springframework.org/schema/beans"  
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="
     http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

  <camelContext id="camel-set-expiration" xmlns="http://camel.apache.org/schema/spring">
    <!-- Copy route for each destination to expire -->
    <route id="setJMSExpiration.my.queue.dlq">
      <from uri="broker:queue:MY.QUEUE.DLQ"/>
        <setHeader headerName="JMSExpiration">
          <!-- Message will expire after 1 day -->
          <spel>#{T(java.lang.System).currentTimeMillis() + 86400000}</spel>
        </setHeader>
      <to uri="broker:queue:MY.QUEUE.DLQ"/>
    </route>
    <route id="setJMSExpiration.another.queue">
      <from uri="broker:queue:ANOTHER.QUEUE"/>
        <setHeader headerName="JMSExpiration">
          <!-- Message will expire after 5 days -->
          <spel>#{T(java.lang.System).currentTimeMillis() + 432000000}</spel>
        </setHeader>
      <to uri="broker:queue:ANOTHER.QUEUE"/>
    </route>
  </camelContext>
</beans>

и импортируйте его в конфигурацию activemq.xml с помощью:

<!-- Add default Expiration (file in the same directory) -->
<import resource="setJMSExpiration.xml"/>

В качестве альтернативы вы также можете указать конкретные для политики назначения , если вы нене нужно, чтобы просроченные сообщения доходили до очереди ActiveMQ.DLQ.

<policyEntry queue="MY.QUEUE.DLQ">
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processExpired="false" />
    </deadLetterStrategy>
</policyEntry>
<policyEntry queue="ANOTHER.QUEUE">
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processExpired="false" />
    </deadLetterStrategy>
</policyEntry>

Единственное ограничение этого способа заключается в том, что вы не можете легко использовать подстановочные знаки, как они здесь закодированы (вы можете, но это потребуетсянекоторые адаптации с использованием заголовка назначения JMS на верблюжьем маршруте).

Мы пытаемся позволить производителям определить timeToLive (и принудительно их максимально использовать), но не всегда возможно заставить их изменить ихкод, чтобы минимизировать количество таких маршрутов.

1 голос
/ 15 января 2012

Хорошо, setJMSExpiration (long expiration):

- это то, что вы НЕ ДОЛЖНЫ вызывать, когда являетесь клиентом.См. Мой рассказ об этом на форуме ActiveMQ.

https://apache -qpid-developers.2158895.n2.nabble.com / MRG-Java-JMS-Expiration-td7171854.html

0 голосов
/ 18 сентября 2018

Параметр конечной точки timeToLive (http://camel.apache.org/jms.html) работал для меня как решение описанной проблемы, сообщения удаляются из очереди через 300000 миллисекунд. например "ActiveMQ: Q.QUEUE disableReplyTo = истина & TimeToLive = 300000"

Установка заголовка JMSExpiration, не вызывала автоматического удаления сообщения из очереди в моем случае. Но этот документ https://www.ibm.com/support/knowledgecenter/en/SSAW57_9.0.0/com.ibm.websphere.nd.multiplatform.doc/ae/rwsf_prjms_timetolive.html дал мне подсказку.

...