Сигнал отката от JMS MessageListener - PullRequest
13 голосов
/ 27 августа 2011

Я работал с JMS и ActiveMQ. Все творит чудеса. Я не использую весну, и я тоже не могу.

Интерфейс javax.jms.MessageListener имеет только один метод, onMessage. Внутри реализации есть вероятность, что будет сгенерировано исключение. Если на самом деле возникает исключение, я говорю, что сообщение не было должным образом обработано и нуждается в повторной попытке. Итак, мне нужен ActiveMQ, чтобы немного подождать, а затем повторить попытку. то есть мне нужно сгенерированное исключение для отката транзакции JMS.

Как мне добиться такого поведения?

Возможно, в ActiveMQ есть какая-то конфигурация, которую я не смог найти.

Или ... возможно, можно было бы покончить с регистрацией MessageListener s для потребителей и потреблять сообщения самостоятельно, в виде цикла:

while (true) {
    // ... some administrative stuff like ...
    session = connection.createSesstion(true, SESSION_TRANSACTED)
    try {
        Message m = receiver.receive(queue, 1000L);
        theMessageListener.onMessage(m);
        session.commit();
    } catch (Exception e) {
        session.rollback();
        Thread.sleep(someTimeDefinedSomewhereElse);
    }
    // ... some more administrative stuff
}

в пару потоков вместо регистрации слушателя.

Или ... я мог бы как-то украсить / AOP / манипулировать байтами MessageListener, чтобы сделать это самостоятельно.

Какой маршрут вы выберете и почему?

note : У меня нет полного контроля над кодом MessageListener s.

EDIT Тест на подтверждение концепции:

@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception {
    final AtomicInteger atomicInteger = new AtomicInteger(0);

    BrokerService brokerService = new BrokerService();

    String bindAddress = "vm://localhost";
    brokerService.addConnector(bindAddress);
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
    brokerService.setUseJmx(false);
    brokerService.start();

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(500);
    redeliveryPolicy.setBackOffMultiplier(2);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(2);

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    activeMQConnectionFactory.setUseRetroactiveConsumer(true);
    activeMQConnectionFactory.setClientIDPrefix("ID");
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);

    pooledConnectionFactory.start();

    Connection connection = pooledConnectionFactory.createConnection();
    Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
    Queue helloQueue = session.createQueue("Hello");
    MessageConsumer consumer = session.createConsumer(helloQueue);
    consumer.setMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                switch (atomicInteger.getAndIncrement()) {
                    case 0:
                        System.out.println("OK, first message received " + textMessage.getText());
                        message.acknowledge();
                        break;
                    case 1:
                        System.out.println("NOPE, second must be retried " + textMessage.getText());
                        throw new RuntimeException("I failed, aaaaah");
                    case 2:
                        System.out.println("OK, second message received " + textMessage.getText());
                        message.acknowledge();
                }
            } catch (JMSException e) {
                e.printStackTrace(System.out);
            }
        }
    });
    connection.start();

    {
        // A client sends two messages...
        Connection connection1 = pooledConnectionFactory.createConnection();
        Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection1.start();

        MessageProducer producer = session1.createProducer(helloQueue);
        producer.send(session1.createTextMessage("Hello World 1"));
        producer.send(session1.createTextMessage("Hello World 2"));

        producer.close();
        session1.close();
        connection1.stop();
        connection1.close();
    }
    JOptionPane.showInputDialog("I will wait, you watch the log...");

    consumer.close();
    session.close();
    connection.stop();
    connection.close();
    pooledConnectionFactory.stop();

    brokerService.stop();

    assertEquals(3, atomicInteger.get());
}

Ответы [ 3 ]

12 голосов
/ 27 августа 2011

Если вы хотите использовать SESSION_TRANSACTED в качестве режима подтверждения, вам необходимо установить RedeliveryPolicy в вашей Connection / ConnectionFactory . Эта страница на сайте ActiveMQ также содержит полезную информацию о том, что вам может понадобиться.

Поскольку вы не используете Spring, вы можете настроить RedeliveryPolicy с помощью чего-то похожего на следующий код(взято из одной из приведенных выше ссылок):

RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

Редактировать С добавлением к ответу фрагмента кода ниже показано, как это работает с транзакциями.Попробуйте этот код с закомментированным методом Session.rollback (), и вы увидите, что использование SESION_TRANSACTED и Session.commit / rollback работает должным образом:

@Test
public void test() throws Exception {
    final AtomicInteger atomicInteger = new AtomicInteger(0);

    BrokerService brokerService = new BrokerService();

    String bindAddress = "vm://localhost";
    brokerService.addConnector(bindAddress);
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
    brokerService.setUseJmx(false);
    brokerService.start();

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(500);
    redeliveryPolicy.setBackOffMultiplier(2);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(2);

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    activeMQConnectionFactory.setUseRetroactiveConsumer(true);
    activeMQConnectionFactory.setClientIDPrefix("ID");

    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);

    pooledConnectionFactory.start();

    Connection connection = pooledConnectionFactory.createConnection();
    final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
    Queue helloQueue = session.createQueue("Hello");
    MessageConsumer consumer = session.createConsumer(helloQueue);
    consumer.setMessageListener(new MessageListener() {

        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                switch (atomicInteger.getAndIncrement()) {
                    case 0:
                        System.out.println("OK, first message received " + textMessage.getText());
                        session.commit();
                        break;
                    case 1:
                        System.out.println("NOPE, second must be retried " + textMessage.getText());
                        session.rollback();
                        throw new RuntimeException("I failed, aaaaah");
                    case 2:
                        System.out.println("OK, second message received " + textMessage.getText());
                        session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace(System.out);
            }
        }
    });
    connection.start();

    {
        // A client sends two messages...
        Connection connection1 = pooledConnectionFactory.createConnection();
        Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection1.start();

        MessageProducer producer = session1.createProducer(helloQueue);
        producer.send(session1.createTextMessage("Hello World 1"));
        producer.send(session1.createTextMessage("Hello World 2"));

        producer.close();
        session1.close();
        connection1.stop();
        connection1.close();
    }
    JOptionPane.showInputDialog("I will wait, you watch the log...");

    consumer.close();
    session.close();
    connection.stop();
    connection.close();
    pooledConnectionFactory.stop();

    assertEquals(3, atomicInteger.get());
}

}

2 голосов
/ 27 августа 2011

Необходимо установить режим подтверждения на Session.CLIENT_ACKNOWLEDGE, клиент подтверждает полученное сообщение, вызывая метод подтверждения сообщения.

QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

Затем, после обработки сообщениянеобходимо удалить метод Message.acknowledge (), чтобы удалить это сообщение.

Message message = ...;
// Processing message

message.acknowledge();
0 голосов
/ 25 декабря 2014

Если ваш сеанс обрабатывается, то в любом случае игнорируется «acceptledgeMode». Итак, просто оставьте сеанс транзакционным и используйте session.rollback и session.commit для подтверждения или отката транзакции.

...