ActiveMQ, как повторно отправить / повторить сообщения DLQ программно - PullRequest
0 голосов
/ 10 февраля 2020

Я хотел бы создать простой фрагмент кода, который извлекает все сообщения из DLQ и повторно отправляет их в исходное место назначения (повторная отправка / повтор AKA)

Это можно легко сделать с помощью пользовательского интерфейса ActiveMQ ( но для одного сообщения за раз).

Ответы [ 2 ]

1 голос
/ 10 февраля 2020

Не существует прямого JMS API для повторной отправки сообщения из DLQ в его исходную очередь. На самом деле API JMS даже не обсуждает очереди недоставленных сообщений. Это просто соглашение, используемое большинством брокеров для работы с сообщениями, которые не могут быть использованы.

Вам необходимо создать фактического потребителя JMS для получения сообщения из DLQ, а затем создать производителя JMS для отправки. сообщение возвращается в исходную очередь.

Важно использовать режим Session.TRANSACTED, чтобы избежать потенциальной потери или дублирования сообщения.

Если вы используете Session.AUTO_ACKNOWLEDGE и существует проблема между временем приема и отправки сообщения (например, сбой приложения, сбой оборудования и т. Д. c.), То сообщение может быть потеряно из-за факт, что это было уже подтверждено прежде, чем это было успешно отправлено.

Если вы используете Session.CLIENT_ACKNOWLEDGE и существует проблема между временем отправки и подтверждения сообщения, то в конечном итоге сообщение может быть дублировано из-за того, что оно уже было отправлено до того, как оно было успешно подтверждено.

Обе операции должны быть частью транзакции JMS, чтобы работа была атомарной c.

Наконец, я рекомендую вам либо вызвать commit() для транзакции сеанс для каждого отправленного сообщения или после небольшой партии сообщений (например, 10). Учитывая, что вы не знаете, сколько сообщений в DLQ, было бы неразумно обрабатывать каждое сообщение в одной транзакции. Как правило, вы хотите, чтобы транзакция была как можно меньше, чтобы минимизировать окно, во время которого может произойти ошибка, и работу транзакции необходимо будет выполнить снова. Кроме того, чем больше транзакция, тем больше памяти кучи потребуется брокеру для отслеживания работы в транзакции. Помните, что вы можете вызывать commit() в одном сеансе столько раз, сколько захотите. Вам не нужно создавать новый сеанс для каждой транзакции.

0 голосов
/ 11 февраля 2020

После ответа Джастина я вручную реализовал механизм повторения так:

public void retryAllDlqMessages() throws JMSException {

        logger.warn("retryAllDlqMessages starting");

        logger.warn("Creating a connection to {}", activemqUrl);

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("test", "test", activemqUrl);

        HashMap<String, MessageProducer> messageProducersMap = new HashMap<>();
        MessageConsumer consumer = null;

        try (ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
                ActiveMQSession session = (ActiveMQSession) connection.createSession(true, Session.SESSION_TRANSACTED)) {
            String dlqName = getDlqName();
            logger.warn("Creating a session to {}", dlqName);
            ActiveMQQueue queue = (ActiveMQQueue) session.createQueue(dlqName);
            logger.warn("Starting JMS Connection");
            connection.start();

            logger.warn("Creating a DLQ consumer");
            consumer = session.createConsumer(queue);
            logger.warn("Consumer start receiving");

            Message message = consumer.receive(CONSUMER_RECEIVE_TIME_IN_MS);

            int retriedMessages = 0;
            while (message != null) {
                try {
                    retryMessage(messageProducersMap, session, message);
                    retriedMessages++;
                } catch (Exception e) {
                    logger.error("Error calling retryMessage for message = {}", message);
                    logger.error("Rolling back the JMS transaction...");
                    session.rollback();
                    return;
                }
                message = consumer.receive(CONSUMER_RECEIVE_TIME_IN_MS);
            }
            logger.warn("Consumer finished retrying {} messages", retriedMessages);
            logger.warn("Commiting JMS Transactions of retry");
            session.commit();

        } finally {
            if (!messageProducersMap.isEmpty()) {
                logger.warn("Closing {} messageProducers in messageProducersMap", messageProducersMap.size());
                for (MessageProducer producer : messageProducersMap.values()) {
                    producer.close();
                }
            }
            if (consumer != null) {
                logger.warn("Closing DLQ Consumer");
                consumer.close();
            }
        }
    }

    private void retryMessage(HashMap<String, MessageProducer> messageProducersMap, ActiveMQSession session, Message message) {

        ActiveMQObjectMessage qm = (ActiveMQObjectMessage) message;
        String originalDestinationName = qm.getOriginalDestination().getQualifiedName();
        logger.warn("Retry message with JmsID={} to original destination {}", qm.getJMSMessageID(), originalDestinationName);
        try {
            if (!messageProducersMap.containsKey(originalDestinationName)) {
                logger.warn("Creating a new producer for original destination: {}", originalDestinationName);
                messageProducersMap.put(originalDestinationName, session.createProducer(qm.getOriginalDestination()));
            }
            logger.info("Producing message to original destination");
            messageProducersMap.get(originalDestinationName).send(qm);
            logger.info("Message sent");
        } catch (Exception e) {
            logger.error("Message retry failed with exception", e);
        }
    }
...