Произвольные заголовки из сообщения запроса по умолчанию не копируются в ответное сообщение, только kafka_correlationId
.
Начиная с версии 2.2, вы можете настроить ReplyHeadersConfigurer
, который вызывается, чтобы определить, какие заголовки должны быть скопированы.
См. документацию .
Начиная с версии 2.2, вы можете добавить ReplyHeadersConfigurer
к фабрике-слушателю контейнера. Это делается для определения того, какие заголовки вы хотите установить в ответном сообщении.
EDIT
Кстати, в 2.2 RKT автоматически устанавливает responseTo, если заголовка нет.
С 2.1.x это можно сделать, но это немного сложнее, и вам придется выполнять часть работы самостоятельно. Ключ должен получить и ответить Message<?>
...
@KafkaListener(id = "so55622224", topics = "so55622224")
@SendTo("dummy.we.use.the.header.instead")
public Message<?> listen(Message<String> in) {
System.out.println(in);
Headers nativeHeaders = in.getHeaders().get(KafkaHeaders.NATIVE_HEADERS, Headers.class);
byte[] replyTo = nativeHeaders.lastHeader(KafkaHeaders.REPLY_TOPIC).value();
byte[] correlation = nativeHeaders.lastHeader(KafkaHeaders.CORRELATION_ID).value();
return MessageBuilder.withPayload(in.getPayload().toUpperCase())
.setHeader("myHeader", nativeHeaders.lastHeader("myHeader").value())
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.setHeader(KafkaHeaders.TOPIC, replyTo)
.build();
}
// This is used to send the reply - needs a header mapper
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); // map all byte[] headers
kafkaTemplate.setMessageConverter(messageConverter);
return kafkaTemplate;
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("myHeader", "myHeaderValue".getBytes()));
headers.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "so55622224.replies".getBytes())); // automatic in 2.2
ProducerRecord<String, String> record = new ProducerRecord<>("so55622224", null, null, "foo", headers);
RequestReplyFuture<String, String, String> future = template.sendAndReceive(record);
ConsumerRecord<String, String> reply = future.get();
System.out.println("Reply: " + reply.value() + " myHeader="
+ new String(reply.headers().lastHeader("myHeader").value()));
};
}