Spring Kafka @SendTo не отправляет заголовки - PullRequest
0 голосов
/ 11 апреля 2019

Я отправляю сообщение в Kafka с помощью ReplyingKafkaTemplate, и оно отправляет сообщение с kafka_correlationId. Однако, когда он попадает в мой метод @KafkaListener и перенаправляет его в тему ответа, заголовки теряются.

Как сохранить заголовки кафки?

Вот моя подпись метода:

@KafkaListener(topics = "input")
@SendTo("reply")
public List<CustomOutput> consume(List<CustomInput> inputs) {
  ... /* some processing */
  return outputs;
}

Я создал ProducerInterceptor, чтобы я мог видеть, какие заголовки отправляются из ReplyingKafkaTemplate, а также из аннотации @SendTo. Из этого, еще одна странная вещь заключается в том, что ReplyingKafkaTemplate не добавляет документированный заголовок kafka_replyTopic к сообщению.

Вот как настроен ReplyingKafkaTemplate:

@Bean
public KafkaMessageListenerContainer<Object, Object> replyContainer(ConsumerFactory<Object, Object> cf) {
  ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
  return new KafkaMessageListenerContainer<>(cf, containerProperties);
}

@Bean
public ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate(ProducerFactory<Object, Object> pf, KafkaMessageListenerContainer<Object, Object> container) {
  return new ReplyingKafkaTemplate<>(pf, container);
}

Я не уверен, что это уместно, но я добавил Spring Cloud Sleuth в качестве зависимости, а заголовки span / trace присутствуют при отправке сообщений, но новые они генерируются, когда сообщение пересылается.

1 Ответ

1 голос
/ 11 апреля 2019

Произвольные заголовки из сообщения запроса по умолчанию не копируются в ответное сообщение, только kafka_correlationId.

Начиная с версии 2.2, вы можете настроить ReplyHeadersConfigurer, который вызывается, чтобы определить, какие заголовки должны быть скопированы.

См. документацию .

Начиная с версии 2.2, вы можете добавить ReplyHeadersConfigurer к фабрике-слушателю контейнера. Это делается для определения того, какие заголовки вы хотите установить в ответном сообщении.

EDIT

Кстати, в 2.2 RKT автоматически устанавливает responseTo, если заголовка нет.

С 2.1.x это можно сделать, но это немного сложнее, и вам придется выполнять часть работы самостоятельно. Ключ должен получить и ответить Message<?> ...

@KafkaListener(id = "so55622224", topics = "so55622224")
@SendTo("dummy.we.use.the.header.instead")
public Message<?> listen(Message<String> in) {
    System.out.println(in);
    Headers nativeHeaders = in.getHeaders().get(KafkaHeaders.NATIVE_HEADERS, Headers.class);
    byte[] replyTo = nativeHeaders.lastHeader(KafkaHeaders.REPLY_TOPIC).value();
    byte[] correlation = nativeHeaders.lastHeader(KafkaHeaders.CORRELATION_ID).value();
    return MessageBuilder.withPayload(in.getPayload().toUpperCase())
            .setHeader("myHeader", nativeHeaders.lastHeader("myHeader").value())
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .build();
}

// This is used to send the reply - needs a header mapper
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
    KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
    MessagingMessageConverter messageConverter = new MessagingMessageConverter();
    messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper("*")); // map all byte[] headers
    kafkaTemplate.setMessageConverter(messageConverter);
    return kafkaTemplate;
}

@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
    return args -> {
        Headers headers = new RecordHeaders();
        headers.add(new RecordHeader("myHeader", "myHeaderValue".getBytes()));
        headers.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "so55622224.replies".getBytes())); // automatic in 2.2
        ProducerRecord<String, String> record = new ProducerRecord<>("so55622224", null, null, "foo", headers);
        RequestReplyFuture<String, String, String> future = template.sendAndReceive(record);
        ConsumerRecord<String, String> reply = future.get();
        System.out.println("Reply: " + reply.value() + " myHeader="
                + new String(reply.headers().lastHeader("myHeader").value()));
    };
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...