Я пытаюсь отправлять / получать сообщения через входящие / исходящие адаптеры AMQP Spring и сталкиваюсь с этой проблемой .
После нахождения ответа Гари здесь , Я начал исследовать, правильно ли мое приложение устанавливает идентификатор сообщения.На самом деле, он позаботился об автоматическом здесь .
Производитель выглядит как this .
Я специально отправляю неверное сообщение и наКонец потребителя. Я наблюдаю, как происходит сбой преобразователя сообщений здесь .
После этого сообщение снова ставится в очередь и снова бесконечно обрабатывается.
Во время отладки этой проблемыЯ заметил, что ID отправленных и полученных сообщений всегда различается.
После дальнейшей отладки этого события я узнал, что стандартное поле идентификатора, предоставляемое базовой средой обмена сообщениями Spring, помечено как временное и переходные заголовки никогда не отображаются .
Вопросы?
- Почему фреймворк не может автоматически сопоставить
MessageHeaders.ID
с AmqpHeaders.MESSAGE_ID
? - Должен ли я назначить поле настраиваемых заголовков в качестве идентификатора и реализовать
MessageKeyGenerator
? - Кстати, как мне это сделать с новым Java DSL?
Большое спасибо!
Приветствия, Ласло
ОБНОВЛЕНИЕ: причина неудачной обработки сообщения в бесконечном цикле была вызвана чем-то другим.
Мне удалось это исправить, добавив defaultRequeueRejected(false)
кконфигурация контейнера приемника входящего адаптера AMQP.
@Bean
public IntegrationFlow webhookInboundFlow(
ConnectionFactory connectionFactory, ObjectMapper objectMapper,
HeaderValueRouter webhookInboundRouter) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory, FORGETME_WEBHOOK_QUEUE_NAME)
.configureContainer(s -> s.defaultRequeueRejected(false))
)
.log(INFO)
.transform(new ObjectToJsonNodeTransformer(objectMapper))
.route(webhookInboundRouter)
.get();
}