Я пытаюсь установить специальный конвертер сообщений для моего обработчика сообщений Spring Integration Kafka (да, я знаю, что могу предоставить конфигурации сериализатора - я пытаюсь сделать что-то немного другое).
Iиметь следующее:
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
final KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setMessageConverter(new MessagingMessageConverter() {
@Override
public ProducerRecord<?, ?> fromMessage(final Message<?> message, final String s) {
LOGGER.info("fromMessage({}, {})", message, s);
return super.fromMessage(message, s);
}
});
return kafkaTemplate;
}
@Bean
@ServiceActivator(inputChannel = "kafkaMessageChannel")
public MessageHandler kafkaMessageHandler() {
final KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression(getTopic()));
handler.setSendSuccessChannel(kafkaSuccessChannel());
return handler;
}
Когда сообщение отправляется на kafkaMessageChannel
, обработчик отправляет его, и результат отображается в kafkaSuccessChannel
, но значение RecordMessageConverter
, которое я установил в шаблоне, никогда не вызывалось