Вы можете напрямую использовать org.springframework.kafka.support.serializer.JsonDeserializer.class
, как показано ниже
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
Итак, потребительская конфигурация будет:
private ConsumerFactory<String, MyDomainModel> myMessageFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KAPP");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyDomainModel.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyDomainModel> myMessageListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyDomainModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(myMessageFactory());
return factory;
}