При настройке JmsTemplate с SESSION_TRANSACTED исключается только одно сообщение - PullRequest
0 голосов
/ 29 августа 2018

Я пытаюсь настроить JmsTemplate транзакционным образом, поэтому, когда происходит ошибка во время обработки или записи, я хочу, чтобы сообщения снова были видны в очереди. У меня есть следующие конфигурации, которые, кажется, работают нормально, когда происходит ошибка, то есть, если ошибка возникает во время записи или обработки, сообщения «отправляются» обратно в очередь. Но проблема в том, что когда все в порядке (т.е. не выдается никаких ошибок), тогда только одно сообщение используется, обрабатывается и удаляется из очереди. Поэтому, если у меня в очереди 10 сообщений, только одно из них удаляется из очереди, и задание помечается как ЗАВЕРШЕННОЕ. Если я настраиваю JmsTemplate с

jmsTemplate.setSessionTransacted(false);

У меня есть все сообщения, обработанные и написанные, но, конечно, я теряю транзакционную манеру. Вопрос в том, как получить все транзакции, которые они будут использовать, обрабатывать и записывать?

У меня есть следующая конфигурация JmsTemplate

@Bean
public JmsTemplate jmsTemplate() {
    JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory());
    jmsTemplate.setDefaultDestinationName(applicationProperties.getQueue());
    jmsTemplate.setDefaultDestination(queue());
    jmsTemplate.setReceiveTimeout(applicationProperties.getReceiveTimeout());
    jmsTemplate.setSessionTransacted(true);
    jmsTemplate.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
    return jmsTemplate;
}

, который передается моему JmsItemReader

@Bean
public ItemReader<MyPojoClass> myJmsItemReader() throws Exception {

    JmsItemReader<MyPojoClass> myJmsItemReader= new JmsItemReader<>();
    myJmsItemReader.setItemType(MyPojoClass.class);
    myJmsItemReader.setJmsTemplate(jmsTemplate);
    myJmsItemReader.afterPropertiesSet();
    return myJmsItemReader;

}

и ниже моя конфигурация тасклета

@Bean(name = "receiveProcessAndWrite")
public TaskletStep receiveProcessAndWrite() throws Exception {

    return this.stepBuilderFactory.get("receiveProcessAndWrite")
            .transactionManager(transactionManager)
            .<MyPojoClass, MyPojoClass>chunk(1000)
            .readerIsTransactionalQueue()
            .reader(myJmsItemReader())
            .processor(myprocessor())
            .writer(updateStatusInDBandWrite())
            .listener(new ChunkCountListener())
            .build();

}
...