У меня есть потребительское приложение, которое прослушивает тему kafka, за исключением того, что оно отправляет запись в dlq (очередь недоставленных сообщений), как и ожидалось.
Перед тем как выдать Exception, я устанавливаю заголовок в потребительском приложении. и он устанавливается, но не отправляется в тему недоставленных сообщений, тема недоставленных сообщений имеет только встроенные заголовки, такие как «x-exception-message», «x-original-partition» и т. д., а не тотчто я установил.
Ниже приведен фрагмент кода в моем приложении Consumer:
modifiedMessage = MessageBuilder.fromMessage(consumedMessage).setHeader("x-ecode", new Integer(100)).setHeader(BinderHeaders.PARTITION_OVERRIDE,consumedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID)).build();
System.out.println("error header:"+modifiedMessage.getHeaders().get("x-ecode",Integer.class)); //100
throw new RuntimeException(modifiedMessage.toString());
Примечание: Iam устанавливает X-код в моем приложении.yml под spring.cloud.stream.kafka.binder.header = x-ecode
В приведенном выше фрагменте кода я могу установить заголовок и фактически проверил, что он установлен, но этоне отправлено в тему недоставленных сообщений ..
Полезная нагрузка отправляется правильно. Как я могу отправить этот заголовок в тему недоставленных сообщений? мне нужно добавить какие-либо свойства в мой application.yml, чтобы включить его отправку?