Вы используете Camel, почему вы "пачкаете руки", программируя под JMS API ?Верблюды JMS компонент делает все это для вас бесплатно.
Просто используйте локальные JMS-транзакции (без внешнего менеджера транзакций) для потребления сообщений, и Camel автоматически фиксирует успешно обработанные сообщения и откатывает сообщения с ошибками обработки (брокер сообщений затем выполняет повторную доставку или перемещаетсообщение для DLQ ).
Ключом для этой настройки является правильная настройка компонента Camel JMS.Для ActiveMQ вы можете сделать это следующим образом
@Bean("activemq")
public ActiveMQComponent createActiveMQComponent(final ConnectionFactory connectionFactory) {
final ActiveMQComponent activeMQComponent = new ActiveMQComponent();
final JmsConfiguration jmsConfiguration = new JmsConfiguration();
jmsConfiguration.setConnectionFactory(connectionFactory);
jmsConfiguration.setLazyCreateTransactionManager(false);
jmsConfiguration.setTransacted(true);
jmsConfiguration.setConcurrentConsumers([number of concurrent consumers]);
jmsConfiguration.setCacheLevelName("CACHE_CONSUMER");
activeMQComponent.setConfiguration(jmsConfiguration);
return activeMQComponent;
}
Важной частью этого являются эти две конфигурации.
jmsConfiguration.setLazyCreateTransactionManager(false);
jmsConfiguration.setTransacted(true);
Эта конфигурация компонента в сочетании с NO Настроенный диспетчер транзакций Spring предоставляет локальные транзакции вашему брокеру.
Если эта настройка выполнена, вы можете просто написать свои маршруты для приема сообщений из очередей:
from("activemq:queue:myQueue")
...
Вы не должны использовать оператор Camel transacted()
вваши маршруты, потому что это связано с настроенным менеджером транзакций и не применимо для локальных транзакций JMS.Но не волнуйтесь, если вышеописанная настройка верна, ваши потребительские маршруты JMS прекрасно проходят без transacted()
.
Вам также не нужно настраивать свои очереди, как в вашем примере.Вы можете настроить конечные точки очереди в Camel как простые строки, как в моем примере
from("activemq:queue:myQueue")
activemq = component name
queue = consume from a queue (not a topic)
myQueue = queue name
Если вы хотите, вы также можете обрабатывать ошибки обработки с помощью Обработчик ошибок Camel .Но обязательно поймите разницу между повторной доставкой брокера (сообщение отправлено снова) и повторной доставкой верблюда (повторяется только этап обработки).