ActiveMQ RedeliveryPolicy не устанавливается - PullRequest
0 голосов
/ 23 октября 2018

Я использую:

  • SpringBoot 2.0.4
  • ActiveMQ 5.15.5
  • Apache Camel 2.22.0
  • Java 1.8
  • Groovy
  • Maven

По сути, у меня есть приложение SpringBoot с маршрутом Apache Camel, которое принимает сообщения от ActiveMQ с транзакциями.Мне нужно установить RedeliveryPolicy на ActiveMQ, поэтому, когда возникает ошибка при обработке, сообщение повторяется несколько раз.

Я создал класс конфигурации с bean-компонентами для ActiveMQ, транзакции работают, как и предполагалось, ноRedeliveryPolicy не работает.Кто-нибудь может помочь мне понять, что с этим не так?

Вот вывод журнала для сообщения, которое выдает ошибку:

2018-10-23 10: 35: 28.005 DEBUG 10524 --- [mer [entryQueue]] oacsspi.TransactionErrorHandler: Начало транзакции (0x35d60381) доставлено (false) для (MessageId: ID: EPIC-LAP-25-50304-1540306817804-4: 3: 1: 1: 2 для ExchangeId: ID-EPIC-LAP-25-1540312510586-0-1)) 2018-10-23 10: 35: 28.020 DEBUG 10524 --- [mer [entryQueue]] o.apache.camel.processor.SendProcessor: >>>> direct: // middle Exchange [ID-EPIC-LAP-25-1540312510586-0-1] 2018-10-23 10: 35: 28.375 DEBUG 10524 --- [mer [entryQueue]] oacamel.processor.DefaultErrorHandler: сбой доставки для (Идентификатор сообщения: ID: EPIC-LAP-25-50304-1540306817804-4: 3: 1: 1: 2 для ExchangeId: ID-EPIC-LAP-25-1540312510586-0-1).При попытке доставки: 0 перехвачено: java.lang.RuntimeException: ExceptionTest: Ошибка заказа 2018-10-23 10: 35: 28.390 ОШИБКА 10524 --- [mer [entryQueue]] oacamel.processor.DefaultErrorHandler: сбой доставки для (MessageId:ID: EPIC-LAP-25-50304-1540306817804-4: 3: 1: 1: 2 для ExchangeId: ID-EPIC-LAP-25-1540312510586-0-1).Исчерпаны после попытки доставки: 1 пойман: java.lang.RuntimeException: ExceptionTest: Сбой заказа

Вот мой класс конфигурации для ActiveMQ:

import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.RedeliveryPolicy
import org.apache.activemq.camel.component.ActiveMQComponent
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.connection.JmsTransactionManager

import javax.jms.DeliveryMode

@Configuration
class ActiveMQConfiguration {

    @Bean
    ActiveMQConnectionFactory activeMQConnectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory()
        activeMQConnectionFactory.brokerURL = 'tcp://localhost:61616'
        activeMQConnectionFactory.userName = 'admin'
        activeMQConnectionFactory.password = 'admin'

        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy()
        redeliveryPolicy.maximumRedeliveries = 3
        redeliveryPolicy.redeliveryDelay = 150L
        redeliveryPolicy.useExponentialBackOff = true
        redeliveryPolicy.backOffMultiplier = 1.5

        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy)

        activeMQConnectionFactory
    }

    @Bean
    ActiveMQComponent activeMQComponent(@Qualifier('activeMQConnectionFactory')ActiveMQConnectionFactory activeMQConnectionFactory) {
        ActiveMQComponent activeMQComponent = new ActiveMQComponent()
        activeMQComponent.connectionFactory = activeMQConnectionFactory
        activeMQComponent.transacted = true
        activeMQComponent.transactionManager = txManager()
        activeMQComponent.cacheLevelName = 'CACHE_CONSUMER'
        activeMQComponent.lazyCreateTransactionManager = false
        activeMQComponent.deliveryMode = DeliveryMode.PERSISTENT

        activeMQComponent
    }

    @Bean
    JmsTransactionManager txManager(@Qualifier('activeMQConnectionFactory') ActiveMQConnectionFactory activeMQConnectionFactory) {
        JmsTransactionManager txManager = new JmsTransactionManager()
        txManager.connectionFactory = activeMQConnectionFactory
        txManager.rollbackOnCommitFailure = true

        txManager
    }

}

Ответы [ 2 ]

0 голосов
/ 24 октября 2018

Здесь есть два вопроса

1.У вас есть два менеджера транзакций

Из-за следующих двух строк в вашей конфигурации компонента Camel ActiveMQ вы настраиваете два менеджера транзакций.Это является источником проблем.

activeMQComponent.transacted = true // activates local JMS transactions
activeMQComponent.transactionManager = txManager() // additional tx manager

, если вы просто хотите использовать транзакции из ActiveMQ, вам не нужно настраивать диспетчер транзакций Spring .

Этих двух строк вашей конфигурации достаточно для получения локальных транзакций с вашим брокером ActiveMQ.

activeMQComponent.transacted = true
activeMQComponent.lazyCreateTransactionManager = false

Так что вам следует удалить эту строку, а также весь компонент txManager

activeMQComponent.transactionManager = txManager()

Если вы в настоящее время устанавливаете флаг транзакции в своих маршрутах на верблюде, вы должны также удалить это.И, как я писал, ваши маршруты, использующие ActiveMQ, все еще обрабатываются, даже если вы удалите все это.

2.Повторная доставка не работает

Вы не опубликовали свои маршруты Camel, но в соответствии с выводом ошибки я предполагаю, что брокер не выполняет повторную доставку, поскольку ошибка обрабатывается Camel .

Это обработчик ошибок Camel o.a.camel.processor.DefaultErrorHandler, который срабатывает, когда возникает ошибка, и поскольку она обрабатывает ошибку, сообщение передается брокеру и, следовательно, повторная доставка не происходит.

Попробуйте отключить обработку ошибок Camel, чтобы проверить, доставляет ли брокер сообщения об ошибках.

errorHandler(noErrorHandler());
0 голосов
/ 23 октября 2018

Не так давно у меня были проблемы с очередями dlq - не все параметры, заданные в коде, работали.Мне пришлось добавить настройки в конфиги acitvemq.Да, разделять конфиги не очень хорошее решение, но другого я не нашел.Ниже приведен мой класс конфигурации для jms и пример конфигурации очереди через activemq.xml:

@Configuration
@EnableJms
public class JmsConfig {

    private Environment env;

    @Autowired
    public void setEnv(Environment env) {
        this.env = env;
    }

    @Bean(name = "activemq")
    public ActiveMQComponent activemq(@Qualifier("activemqTransactionManager") JmsTransactionManager jmsTransactionManager,
                                      @Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
        ActiveMQComponent activeMQComponent = new ActiveMQComponent();
        activeMQComponent.setTransactionManager(jmsTransactionManager);
        activeMQComponent.setConnectionFactory(connectionFactory);
        return activeMQComponent;
    }

    @Bean(name = "activemqJmsTemplate")
    public JmsTemplate jmsTemplate(@Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory);
        return template;
    }

    @Bean(name = "activemqTransactionPolicy")
    public SpringTransactionPolicy activemqTransactionPolicy(
            @Qualifier("activemqTransactionManager") JmsTransactionManager jmsTransactionManager) {
        SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy(jmsTransactionManager);
        springTransactionPolicy.setPropagationBehaviorName("PROPAGATION_REQUIRED");
        return springTransactionPolicy;
    }

    @Bean(name = "activemqTransactionManager")
    public JmsTransactionManager activemqTransactionManager(
            @Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
        return new JmsTransactionManager(connectionFactory);
    }

    @Bean(name = "activemqConnectionFactory")
    public ConnectionFactory connectionFactory(@Qualifier("activemqRedeliveryPolicy") RedeliveryPolicy redeliveryPolicy) {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://" + env.getProperty("queue.url"));
        connectionFactory.setTrustAllPackages(true);

        RedeliveryPolicyMap map = connectionFactory.getRedeliveryPolicyMap();
        map.put(new ActiveMQQueue("queueName.DLQ"), redeliveryPolicy);
        return connectionFactory;
    }

    @Bean(name = "activemqRedeliveryPolicy")
    public RedeliveryPolicy redeliveryPolicy() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(0);
        return redeliveryPolicy;
    }
}

Изменения в activevq.xml:

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <!--set dead letter queue for our queue. It name will be "myQueueName.DLQ"-->
            <policyEntry queue="myQueueName">
                <deadLetterStrategy>
                    <individualDeadLetterStrategy queuePrefix="" queueSuffix=".DLQ"/>
                </deadLetterStrategy>
            </policyEntry>
            <policyEntry topic=">">
                <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                </pendingMessageLimitStrategy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

<plugins>
<redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
    <redeliveryPolicyMap>
        <redeliveryPolicyMap>
            <redeliveryPolicyEntries>
                <!--Set the redelivery delay to one hour-->
                <redeliveryPolicy queue="myQueueName.DLQ" maximumRedeliveries="-1" redeliveryDelay="3600000"/>
            </redeliveryPolicyEntries>
        </redeliveryPolicyMap>
    </redeliveryPolicyMap>
</redeliveryPlugin>
</plugins>
...