Я пытаюсь настроить JmsTemplate транзакционным образом, поэтому, когда происходит ошибка во время обработки или записи, я хочу, чтобы сообщения снова были видны в очереди. У меня есть следующие конфигурации, которые, кажется, работают нормально, когда происходит ошибка, то есть, если ошибка возникает во время записи или обработки, сообщения «отправляются» обратно в очередь.
Но проблема в том, что когда все в порядке (т.е. не выдается никаких ошибок), тогда только одно сообщение используется, обрабатывается и удаляется из очереди.
Поэтому, если у меня в очереди 10 сообщений, только одно из них удаляется из очереди, и задание помечается как ЗАВЕРШЕННОЕ.
Если я настраиваю JmsTemplate с
jmsTemplate.setSessionTransacted(false);
У меня есть все сообщения, обработанные и написанные, но, конечно, я теряю транзакционную манеру.
Вопрос в том, как получить все транзакции, которые они будут использовать, обрабатывать и записывать?
У меня есть следующая конфигурация JmsTemplate
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory());
jmsTemplate.setDefaultDestinationName(applicationProperties.getQueue());
jmsTemplate.setDefaultDestination(queue());
jmsTemplate.setReceiveTimeout(applicationProperties.getReceiveTimeout());
jmsTemplate.setSessionTransacted(true);
jmsTemplate.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
return jmsTemplate;
}
, который передается моему JmsItemReader
@Bean
public ItemReader<MyPojoClass> myJmsItemReader() throws Exception {
JmsItemReader<MyPojoClass> myJmsItemReader= new JmsItemReader<>();
myJmsItemReader.setItemType(MyPojoClass.class);
myJmsItemReader.setJmsTemplate(jmsTemplate);
myJmsItemReader.afterPropertiesSet();
return myJmsItemReader;
}
и ниже моя конфигурация тасклета
@Bean(name = "receiveProcessAndWrite")
public TaskletStep receiveProcessAndWrite() throws Exception {
return this.stepBuilderFactory.get("receiveProcessAndWrite")
.transactionManager(transactionManager)
.<MyPojoClass, MyPojoClass>chunk(1000)
.readerIsTransactionalQueue()
.reader(myJmsItemReader())
.processor(myprocessor())
.writer(updateStatusInDBandWrite())
.listener(new ChunkCountListener())
.build();
}