Мне нужно установить сериализатор пользовательских значений в KafkaTemplate Spring . Значение сериализатора выглядит следующим образом:
JsonSerializer<JourneyMailExchange> serializer = new JsonSerializer<>(customObjectMapper);
(например, для применения PropertyNamingStrategy.SNAKE_CASE
)
это легко сделать с KafkaProducer:
new KafkaProducer<>(properties, Serdes.String().serializer(), valueSerializer);
но я не нашел возможности сделать то же самое с KafkaTemplate
. Я вижу только props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
свойство для ProducerFactory
, но это не то, что я ищу (невозможно предоставить конкретный экземпляр).
Мы создаем KafkaTemplate следующим образом:
@Bean
public ProducerFactory<String, Object> producerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaProducerFactory<>(producerConfigs(kafkaProperties.getDefaultSettings()));
}
private static Map<String, Object> producerConfigs(Map<String, String> defaultSettings) {
Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setDefaultTopic("topicName");
return kafkaTemplate;
}
Также найден следующий метод:
kafkaTemplate.setMessageConverter(new StringJsonMessageConverter(customObjectMapper));
, но он применяет преобразование, только если мы работаем с экземпляром Message
Spring и игнорируем предоставленное преобразование, если мы вызываем методы отправки с ключом и значением.
spring-kafka версия: 2.1.0.RELEASE