Как добавить дополнительные пользовательские заголовки при выдаче исключения для хранения сообщения в dlq на kafka? - PullRequest
1 голос
/ 19 октября 2019

У меня есть потребительское приложение, которое прослушивает тему kafka, за исключением того, что оно отправляет запись в dlq (очередь недоставленных сообщений), как и ожидалось.

Перед тем как выдать Exception, я устанавливаю заголовок в потребительском приложении. и он устанавливается, но не отправляется в тему недоставленных сообщений, тема недоставленных сообщений имеет только встроенные заголовки, такие как «x-exception-message», «x-original-partition» и т. д., а не тотчто я установил.

Ниже приведен фрагмент кода в моем приложении Consumer:

modifiedMessage = MessageBuilder.fromMessage(consumedMessage).setHeader("x-ecode", new Integer(100)).setHeader(BinderHeaders.PARTITION_OVERRIDE,consumedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID)).build();
System.out.println("error header:"+modifiedMessage.getHeaders().get("x-ecode",Integer.class)); //100
throw new RuntimeException(modifiedMessage.toString());

Примечание: Iam устанавливает X-код в моем приложении.yml под spring.cloud.stream.kafka.binder.header = x-ecode

В приведенном выше фрагменте кода я могу установить заголовок и фактически проверил, что он установлен, но этоне отправлено в тему недоставленных сообщений ..

Полезная нагрузка отправляется правильно. Как я могу отправить этот заголовок в тему недоставленных сообщений? мне нужно добавить какие-либо свойства в мой application.yml, чтобы включить его отправку?

1 Ответ

1 голос
/ 19 октября 2019

Все, что вы делаете - это создаете новое сообщение и отбрасываете его (за исключением того, что вы устанавливаете сообщение об исключении в качестве его строковой реализации.

Это не изменит исходное входящее сообщение.

Вы можете просто добавить новую выходную привязку и отправить ей измененное сообщение самостоятельно.

Или, если вы только добавляете константу, вы можете добавить ChannelInterceptor в канал ошибок привязки и изменить там сообщение. Если вам нужно передать состояние перехватчику, вы можете использовать пользовательское исключение.

Однако самое простое решение - просто опубликовать сообщение самостоятельно, а не использовать механизм DLQ связывателя. У вас уже есть логика для созданиясообщение будет просто myCustomDlqBinding.send(modifiedMessage) (также добавляя также стандартные заголовки DLQ).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...