Не удается заставить ActiveMQ переслать мои сообщения - PullRequest
7 голосов
/ 20 декабря 2011

У меня есть однопоточный потребитель ActiveMQ, написанный на Java.Все, что я пытаюсь сделать, это получить () сообщение из очереди, попытаться отправить его в веб-службу, и, если это удастся, подтвердить () его.Если вызов веб-службы завершится неудачно, я хочу, чтобы сообщение оставалось в очереди и повторно отправлялось через некоторое время.

Это более или менее работает, за исключением повторной отправки: каждый раз, когда я перезапускаю своего потребителя, оно получаетодно сообщение для каждого, которое все еще находится в очереди, но после неудачной отправки сообщения никогда не отправляются повторно.

Мой код выглядит так:

public boolean init() throws JMSException, FileNotFoundException, IOException {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    RedeliveryPolicy policy = new RedeliveryPolicy();
    policy.setInitialRedeliveryDelay(500);
    policy.setBackOffMultiplier(2);
    policy.setUseExponentialBackOff(true);

    connectionFactory.setRedeliveryPolicy(policy);
    connectionFactory.setUseRetroactiveConsumer(true); // ????
    Connection connection = connectionFactory.createConnection();

    connection.setExceptionListener(this);
    connection.start();

    session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
    destination = session.createQueue(subject); //???

    consumer = session.createConsumer(destination);
    //consumer.setMessageListener(this); // message listener had same behaviour

}

private void process() {
    while(true) {
        System.out.println("Waiting...");
        try {
            Message message = consumer.receive();
            onMessage(message);
        } catch (JMSException e) {
            e.printStackTrace();
        }
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

@Override
public void onMessage(Message message) {
    System.out.println("onMessage");
    messagesReceived++;

    if (message instanceof TextMessage) {
        try {
            TextMessage txtMsg = (TextMessage) message;
            String msg = txtMsg.getText();

            if(!client.sendMessage(msg)) {
                System.out.println("Webservice call failed. Keeping message");
                //message.
            } else {
                message.acknowledge();
            }

            if (transacted) {
                if ((messagesReceived % batch) == 0) {
                    System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived);
                    session.commit();
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

В настоящее время я не использую транзакции(может быть, мне следовало бы?).

Я уверен, что упускаю что-то легкое и скоро буду бить себя по лбу, но я не могу понять, как это должно работать.Спасибо!


РЕДАКТИРОВАТЬ: Не могу ответить на это сам, как недостаточно rep:

ОК, после еще нескольких экспериментов оказывается, что транзакции - единственный способ сделать это.Вот новый код:

public boolean init() throws JMSException, FileNotFoundException, IOException {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    RedeliveryPolicy policy = new RedeliveryPolicy();
    policy.setInitialRedeliveryDelay(1000L);
    policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);

    connectionFactory.setRedeliveryPolicy(policy);
    connectionFactory.setUseRetroactiveConsumer(true);
    Connection connection = connectionFactory.createConnection();

    connection.setExceptionListener(this);
    connection.start();

    session = connection.createSession(transacted, ActiveMQSession.CLIENT_ACKNOWLEDGE);
    destination = session.createQueue(subject);

    consumer = session.createConsumer(destination);
}

@Override
public void onMessage(Message message) {
    System.out.println("onMessage");
    messagesReceived++;

    if (message instanceof TextMessage) {
        try {
            TextMessage txtMsg = (TextMessage) message;
            String msg = txtMsg.getText();

            if(client.sendMessage(msg)) {
                if(transacted) {
                    System.out.println("Call succeeded - committing message");
                    session.commit();
                }
                //message.acknowledge();
            } else {
                if(transacted) {
                    System.out.println("Webservice call failed. Rolling back message");
                    session.rollback();
                }
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Теперь сообщение отправляется каждые 1000 мс, как указано в Политике повторной доставки.

Надеюсь, это поможет кому-то еще!:)

1 Ответ

4 голосов
/ 07 марта 2012

Вам не нужно использовать транзакции, CLIENT_ACK / Session.recover () также будет работать ...

Сообщения доставляются клиенту, когда происходит любое из следующих событий:

  • Используется транзакционный сеанс и вызывается rollback ().
  • Транзакционный сеанс закрывается до вызова commit.
  • Сеансиспользование CLIENT_ACKNOWLEDGE и Session.recover () вызывается.

см. http://activemq.apache.org/message-redelivery-and-dlq-handling.html

...