У меня есть проект Spring Integration, в котором я отправляю и получаю сообщения из очереди RabbitMQ.
Порядок, в котором система публикует сообщения, в порядке, но порядок, в котором она впоследствии получает сообщения, неверен.
Итак, я нашел этот абзац (https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp -strict-ordering ) и настроил слушателя следующим образом: simpleMessageListenerContainer.setPrefetchCount(1);
.
У нас было несколько тестов, и он хорошо работал. Однако примерно через неделю он начал вызывать похожие проблемы с упорядочением.
Позвольте мне объяснить немного подробнее:
У меня есть два потока (IntegrationFlow
s) в одном приложении интеграции пружин.
В первом IntegrationFlow
он создает сообщения и публикует каждое сообщение в очереди кроликов.
Непосредственно перед публикацией оно регистрирует каждое сообщение, и я могу подтвердить, что sequenceNumber
увеличивается, как и ожидалось (в моем случае 1,2,3,4,5,6,7,8,9,10, 11).
Затем во втором потоке расходуются эти опубликованные сообщения. Сразу после получения каждого сообщения поток снова регистрирует его. Здесь я обнаружил, что sequenceNumber
не увеличивается, как ожидалось (в моем случае 1,3,5,7,2,4,6,8,9,10,11).
Это очень для этого приложения важно обрабатывать сообщения в правильном порядке.
Когда я посмотрел на пользовательский интерфейс кролика, я обнаружил следующее (большинство из них - то, что я ожидаю):
- кролик имеет 3 соединения (для приложений 3 java)
- соединение для моего приложения имеет 3 канала. 2 из них бездействуют / не имеют потребителей, 1 имеет 6 подписчиков и количество предварительных выборок равно 1.
- каждый подписчик имеет количество предварительных выборок 1
- . Я обеспокоен только одним из этих подписчиков (очередь).
- эта очередь имеет свойства «ack required», а не «exclusive».
Я не ожидал, что в моем приложении будет 3 канала. Я не настраивал это сам, может быть, Spring Integration / AMQP сделал это для меня.
Теперь я думаю, что другой канал может стать активным, и это вызывает проблему упорядочения. Но я не могу найти это в журнале. И не в конфигурации.
Куски кода:
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(final ConnectionFactory connectionFactory,
final Jackson2JsonMessageConverter jackson2MessageConverter,
final MethodInterceptor retryInterceptor) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.setMessageConverter(jackson2MessageConverter);
simpleMessageListenerContainer.setAdviceChain(retryInterceptor);
// force FIFO ordering (https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-strict-ordering):
simpleMessageListenerContainer.setPrefetchCount(1);
simpleMessageListenerContainer.setConcurrency();
return simpleMessageListenerContainer;
}
@Bean
public IntegrationFlow routeIncomingAmqpMessagesFlow(final SimpleMessageListenerContainer simpleMessageListenerContainer,
final Queue q1, final Queue q2, final Queue q3,
final Queue q4, final Queue q5,
final Queue q6) {
simpleMessageListenerContainer.setQueues(q1, q2, q3, q4, q5, q6);
return IntegrationFlows.from(
Amqp.inboundAdapter(simpleMessageListenerContainer)
.messageConverter(jackson2MessageConverter))
.log(LoggingHandler.Level.DEBUG, "com.my.thing")
.headerFilter(MyMessageHeaders.QUEUE_ROUTING_KEY)
.route(router())
.get();
}
private HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter(AmqpHeaders.CONSUMER_QUEUE);
router.setChannelMapping(AmqpConfiguration.Q1_NAME, Q1_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q2_NAME, Q2_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q3_NAME, Q3_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q4_NAME, Q4_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q5_NAME, Q5_CHANNEL);
router.setChannelMapping(AmqpConfiguration.Q6_NAME, Q6_CHANNEL);
router.setResolutionRequired(false);
router.setDefaultOutputChannelName("errorChannel");
return router;
}
publi sh:
@Bean
public IntegrationFlow prepareForUpload(final Handler1 handler1) {
BinaryFileSplitter binaryFileSplitter = new BinaryFileSplitter(true);
binaryFileSplitter.setChunkSize(chunksize);
return IntegrationFlows
.from(aFlow)
.handle(handler1)
.split(binaryFileSplitter)
.log(LoggingHandler.Level.TRACE, "com.my.log.identifyer")
// Send message to the correct AMQP queue after successful processing
.enrichHeaders(h -> h.header(QUEUE_ROUTING_KEY, AmqpConfiguration.Q4_NAME))
.channel(MyChannels.AMQP_OUTPUT)
.get();
}
@Bean
public IntegrationFlow outputAmqpFlow(final AmqpTemplate amqpTemplate, final UpdateDb updateDb) {
return IntegrationFlows.from(MyChannels.AMQP_OUTPUT)
.log(LoggingHandler.Level.DEBUG, "com.my.log.identify")
.handle(updateDb)
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName(AmqpConfiguration.THE_TOPIC_EXCHANGE)
.routingKeyExpression("headers['queueRoutingKey']"))
.get();
}
получить:
@Bean
public IntegrationFlow handleReceivedMessages() {
return IntegrationFlows
.from(Q4_CHANNEL)
.log(LoggingHandler.Level.DEBUG, "com.my.log.identifyer")
.handle(..)
.aggregate(a -> a.releaseStrategy(new ChunkReleaseStrategy()))
.transform(..)
....(..)..
...