Чтобы сохранить порядок сообщений, одним из подходов может быть временная остановка приемника сообщений как часть вашей стратегии отката. Глядя на Spring Boot do c для DefaultMessageListenerContainer
, есть метод stop(Runnable callback)
. Я экспериментировал с использованием этого при откате следующим образом.
Чтобы обеспечить однопоточность моего слушателя, на моем DefaultJmsListenerContainerFactory
я установил containerFactory.setConcurrency("1")
.
В моем слушателе я установил an id
@JmsListener(destination = "DEV.QUEUE.2", containerFactory = "listenerTwoFactory", concurrency="1", id="listenerTwo")
И получить экземпляр DefaultMessageListenerContainer
.
JmsListenerEndpointRegistry reg = context.getBean(JmsListenerEndpointRegistry.class);
DefaultMessageListenerContainer mlc = (DefaultMessageListenerContainer) reg.getListenerContainer("listenerTwo");
Для тестирования я проверяю JMSXDeliveryCount и генерирую исключение для отката.
retryCount = Integer.parseInt(msg.getStringProperty("JMSXDeliveryCount"));
if (retryCount < 5) {
throw new Exception("Rollback test "+retryCount);
}
При обработке улова Listener я вызываю stop(Runnable callback)
в экземпляре DefaultMessageListenerContainer
и передаю новый класс ContainerTimedRestart
, как определено ниже.
//catch processing here and decide to rollback
mlc.stop(new ContainerTimedRestart(mlc,delay));
System.out.println("#### "+getClass().getName()+" Unable to process message.");
throw new Exception();
ContainerTimedRestart
extends Runnable
и DefaultMessageListenerContainer
отвечает за вызов метода run()
после завершения вызова остановки.
public class ContainerTimedRestart implements Runnable {
//Container instance to restart.
private DefaultMessageListenerContainer theMlc;
//Default delay before restart in mills.
private long theDelay = 5000L;
//Basic constructor for testing.
public ContainerTimedRestart(DefaultMessageListenerContainer mlc, long delay) {
theMlc = mlc;
theDelay = delay;
}
public void run(){
//Validate container instance.
try {
System.out.println("#### "+getClass().getName()+"Waiting for "+theDelay+" millis.");
Thread.sleep(theDelay);
System.out.println("#### "+getClass().getName()+"Restarting container.");
theMlc.start();
System.out.println("#### "+getClass().getName()+"Container started!");
} catch (InterruptedException ie) {
ie.printStackTrace();
//Further checks and ensure container is in correct state.
//Report errors.
}
}
Я загрузил в свою очередь три сообщения с полезными данными «a», «b» и «c» соответственно и запустил прослушиватель.
Проверяя DEV.QUEUE.2
в моем диспетчере очередей, я вижу IPPROCS(1)
, подтверждающее, что только один дескриптор приложения имеет открытую очередь. Сообщения обрабатываются по порядку после того, как каждое повторяется пять раз и с 5-секундной задержкой между попытками отката.