Как смоделировать повторную доставку сообщения в сценарии сеанса JMS AUTO_ACKNOWLEDGE? - PullRequest
6 голосов
/ 26 марта 2012

В следующем тесте я пытаюсь смоделировать следующий сценарий:

  1. Очередь сообщений запущена.
  2. Запущен потребитель, предназначенный для сбоя во время обработки сообщения.
  3. Создается сообщение.
  4. Потребитель начинает обработку сообщения.
  5. Во время обработки выдается исключение для имитации ошибки обработки сообщения.Неисправный потребитель останавливается.
  6. Другой потребитель запускается с намерением забрать доставленное сообщение.

Но мой тест не пройден, и сообщение не доставлено новому потребителю.Буду признателен за любые подсказки по этому вопросу.

MessageProcessingFailureAndReprocessingTest.java

@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig",
        loader=JavaConfigContextLoader.class)
public class MessageProcessingFailureAndReprocessingTest  extends AbstractJUnit4SpringContextTests {
    @Autowired
    private FailureReprocessTestScenario testScenario;

    @Before
    public void setUp() {
        testScenario.start();
    }

    @After
    public void tearDown() throws Exception {
        testScenario.stop();
    }

    @Test public void 
    should_reprocess_task_after_processing_failure() {
        try {
            Thread.sleep(20*1000);

            assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{
                    "task-1",
            })));
        } catch (InterruptedException e) {
            fail();
        }
    }

    @Configurable
    public static class FailureReprocessTestScenario {
        @Autowired
        public BrokerService broker;

        @Autowired
        public MockTaskProducer mockTaskProducer;

        @Autowired
        public FailingWorker failingWorker;

        @Autowired
        public SucceedingWorker succeedingWorker;

        @Autowired
        public TaskScheduler scheduler;

        public void start() {
            Date now = new Date();
            scheduler.schedule(new Runnable() {
                public void run() { failingWorker.start(); }
            }, now);

            Date after1Seconds = new Date(now.getTime() + 1*1000);
            scheduler.schedule(new Runnable() {
                public void run() { mockTaskProducer.produceTask(); }
            }, after1Seconds);

            Date after2Seconds = new Date(now.getTime() + 2*1000);
            scheduler.schedule(new Runnable() {
                public void run() {
                    failingWorker.stop();
                    succeedingWorker.start();
                }
            }, after2Seconds);
        }

        public void stop() throws Exception {
            succeedingWorker.stop();
            broker.stop();
        }
    }

    @Configuration
    @ImportResource(value={"classpath:applicationContext-jms.xml",
            "classpath:applicationContext-task.xml"})
    public static class ContextConfig {
        @Autowired
        private ConnectionFactory jmsFactory;

        @Bean
        public FailureReprocessTestScenario testScenario() {
            return new FailureReprocessTestScenario();
        }

        @Bean
        public MockTaskProducer mockTaskProducer() {
            return new MockTaskProducer();
        }

        @Bean
        public FailingWorker failingWorker() {
            TaskListener listener = new TaskListener();
            FailingWorker worker = new FailingWorker(listenerContainer(listener));
            listener.setProcessor(worker);
            return worker;
        }

        @Bean
        public SucceedingWorker succeedingWorker() {
            TaskListener listener = new TaskListener();
            SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener));
            listener.setProcessor(worker);
            return worker;
        }

        private DefaultMessageListenerContainer listenerContainer(TaskListener listener) {
            DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
            listenerContainer.setConnectionFactory(jmsFactory);
            listenerContainer.setDestinationName("tasksQueue");
            listenerContainer.setMessageListener(listener);
            listenerContainer.setAutoStartup(false);
            listenerContainer.initialize();
            return listenerContainer;
        }

    }

    public static class FailingWorker implements TaskProcessor {
        private Logger LOG = Logger.getLogger(FailingWorker.class.getName());

        private final DefaultMessageListenerContainer listenerContainer;

        public FailingWorker(DefaultMessageListenerContainer listenerContainer) {
            this.listenerContainer = listenerContainer;
        }

        public void start() {
            LOG.info("FailingWorker.start()");
            listenerContainer.start();
        }

        public void stop() {
            LOG.info("FailingWorker.stop()");
            listenerContainer.stop();
        }

        @Override
        public void processTask(Object task) {
            LOG.info("FailingWorker.processTask(" + task + ")");
            try {
                Thread.sleep(1*1000);
                throw Throwables.propagate(new Exception("Simulate task processing failure"));
            } catch (InterruptedException e) {
                LOG.log(Level.SEVERE, "Unexpected interruption exception");
            }
        }
    }

    public static class SucceedingWorker implements TaskProcessor {
        private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName());

        private final DefaultMessageListenerContainer listenerContainer;

        public final List<String> processedTasks;

        public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) {
            this.listenerContainer = listenerContainer;
            this.processedTasks = new ArrayList<String>();
        }

        public void start() {
            LOG.info("SucceedingWorker.start()");
            listenerContainer.start();
        }

        public void stop() {
            LOG.info("SucceedingWorker.stop()");
            listenerContainer.stop();
        }

        @Override
        public void processTask(Object task) {
            LOG.info("SucceedingWorker.processTask(" + task + ")");
            try {
                TextMessage taskText = (TextMessage) task;
                processedTasks.add(taskText.getText());
            } catch (JMSException e) {
                LOG.log(Level.SEVERE, "Unexpected exception during task processing");
            }
        }
    }

}

TaskListener.java

public class TaskListener implements MessageListener {

    private TaskProcessor processor;

    @Override
    public void onMessage(Message message) {
        processor.processTask(message);
    }

    public void setProcessor(TaskProcessor processor) {
        this.processor = processor;
    }

}

MockTaskProducer.java

@Configurable
public class MockTaskProducer implements ApplicationContextAware {
    private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName());

    @Autowired
    private JmsTemplate jmsTemplate;

    private Destination destination;

    private int taskCounter = 0;

    public void produceTask() {
        LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")");

        taskCounter++;

        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage message = session.createTextMessage("task-" + taskCounter);
                return message;
            }
        });
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
            throws BeansException {
        destination = applicationContext.getBean("tasksQueue", Destination.class);
    }
}

1 Ответ

7 голосов
/ 27 марта 2012

Очевидно, источник документации, который я искал вчера Создание надежных приложений JMS вводит меня в заблуждение (или я мог неправильно это понять). Особенно этот отрывок:

До тех пор, пока сообщение JMS не будет подтверждено, оно не считается успешно потребляется. Успешное использование сообщения обычно происходит в три этапа.

  1. Клиент получает сообщение.
  2. Клиент обрабатывает сообщение.
  3. Сообщение подтверждено. Подтверждение инициируется либо поставщиком JMS, либо клиентом, в зависимости от сеанса режим подтверждения.

Я предположил, AUTO_ACKNOWLEDGE делает именно это - подтверждает сообщение после того, как метод слушателя возвращает результат. Но в соответствии со спецификацией JMS это немного отличается, и контейнеры прослушивателя Spring, как и ожидалось, не пытаются изменить поведение из спецификации JMS. Это то, что должен сказать Javadoc AbstractMessageListenerContainer - я подчеркнул важные предложения:

Контейнер слушателя предлагает следующее подтверждение сообщения опции:

  • «sessionAcknowledgeMode» установлен в «AUTO_ACKNOWLEDGE» (по умолчанию): Автоматическое подтверждение сообщения перед выполнением слушателя; Нет возврата в случае исключения.
  • "sessionAcknowledgeMode" установлен в "CLIENT_ACKNOWLEDGE": автоматическое подтверждение сообщения после успешного выполнения слушателя; нет доставка в случае исключения.
  • "sessionAcknowledgeMode" установлен в "DUPS_OK_ACKNOWLEDGE": отложенное подтверждение сообщения во время или после выполнения слушателя; потенциал доставка в случае исключения.
  • "sessionTransacted" установлен в "true": подтверждение транзакции после успешного выполнения слушателя; гарантированный возврат в случае исключения.

Итак, ключ к моему решению: listenerContainer.setSessionTransacted(true);

Еще одна проблема, с которой я столкнулся, заключалась в том, что провайдер JMS продолжает возвращать сообщение с ошибкой обратно тому же потребителю, который потерпел неудачу при обработке сообщения. Я не знаю, дает ли спецификация JMS рецепт того, что должен делать провайдер в таких ситуациях, но для меня сработало использование listenerContainer.shutdown();, чтобы отключить отказавшего потребителя и позволить провайдеру повторно доставить сообщение и дать шанс другому потребителю.

...