ActiveMQ: очередь мертвых писем следит за порядком моих сообщений - PullRequest
6 голосов
/ 13 июля 2010

Я использую ActiveMQ в качестве брокера для доставки сообщений. Эти сообщения предназначены для написания в dabatase. Иногда база данных недоступна или недоступна. В этом случае я хочу откатить свое сообщение, чтобы позже повторить попытку и продолжить чтение других сообщений.

Этот код работает нормально, за исключением одного момента: откатное сообщение не позволяет мне прочесть остальные:

private Connection getConnection() throws JMSException {
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setMaximumRedeliveries(3); // will retry 3 times to dequeue rollbacked messages
    redeliveryPolicy.setInitialRedeliveryDelay(5 *1000);  // will wait 5s to read that message

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    Connection connection = connectionFactory.createConnection();
    ((ActiveMQConnection)connection).setUseAsyncSend(true);
    ((ActiveMQConnection)connection).setDispatchAsync(true);
    ((ActiveMQConnection)connection).setRedeliveryPolicy(redeliveryPolicy);
    ((ActiveMQConnection)connection).setStatsEnabled(true);
    connection.setClientID("myClientID");
    return connection;
}

Я создаю свою сессию так:

session = connection.createSession(true, Session.SESSION_TRANSACTED);

Откат легко задать:

session.rollback();

Давайте представим, что в моей очереди 3 сообщения:

1: ok
2: KO (will need to be treated again : the message I want to rollback)
3: ok
4: ok

Мой потребитель сделает (линейная последовательность):

commit 1 
rollback 2
wait 5s
rollback 2
wait 5s
rollback 2
put 2 in dead letter queue (ActiveMQ.DLQ)
commit 3
commit 4

Но я хочу:

commit 1
rollback 2
commit 3
commit 4
wait 5s
rollback 2
wait 5s
rollback 2
wait 5s
put 2 in dead letter queue (ActiveMQ.DLQ)

Итак, как я могу настроить моего Потребителя так, чтобы позже откладывать откатанные сообщения?

Ответы [ 3 ]

8 голосов
/ 18 марта 2011

Это на самом деле ожидаемое поведение, потому что повторные сообщения обрабатываются клиентом, а не посредником. Таким образом, поскольку у вас есть привязка к 1 сеансу, а ваша политика повторных попыток настроена на 3 повторения до DLQ, весь процесс повторных попыток блокирует этот конкретный поток.

Итак, мой первый вопрос: если вставка базы данных завершится неудачно, разве вы не ожидаете, что все остальные вставки базы данных потерпят неудачу по той же причине?

Если нет, то способ обойти это - установить политику повторных попыток для этой очереди равной 0 повторных попыток с определенным DLQ, чтобы сообщения сразу же не работали и переходили в DLQ. Затем запустите другой процесс, который каждые 5 секунд снимает DLQ, обрабатывает его и / или помещает обратно в основную очередь для обработки.

0 голосов
/ 25 февраля 2014

У меня была такая же проблема, я не нашел здесь решения, поэтому решил опубликовать ее здесь после того, как нашел одну для людей, борющихся с тем же.Это исправлено до версии 5.6, когда для свойства nonBlockingRedelivery было установлено значение true в фабрике соединений:

<property name="nonBlockingRedelivery" value="true" />
0 голосов
/ 13 июля 2010

Используете ли вы <strictOrderDispatchPolicy /> в XML-файле конфигурации ActiveMQ?Я не уверен, повлияет ли это на порядок сообщений для повторной доставки или нет.Если вы используете строгий порядок рассылки, попробуйте закомментировать эту политику, чтобы увидеть, изменит ли это поведение.

Брюс

...