Я думаю, что это не проблема с Кафкой, но с Джексоном.
Кафка использует Джексона по умолчанию для сериализации / десериализации. Вы можете зарегистрировать JavaTimeModule
для ObjectMapper
и построить JsonDeserializer
с ним. Затем создайте свой пользовательский ConsumerFactory
:
public ConsumerFactory<String, Request> consumerFactory() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
StringDeserializer keyDeserializer = new StringDeserializer();
JsonDeserializer<Request> jsonDeserializer = new JsonDeserializer<>(objectMapper);
ConsumerFactory<String, Request> factory = new DefaultKafkaConsumerFactory<>(config, keyDeserializer, jsonDeserializer);
return factory;
}
. Вам нужно будет добавить в свой проект зависимость jackson-datatype-jsr310 , чтобы сделать JavaTimeModule
доступным.
И если вы используете spring-boot и хотите настроить ObjectMapper
глобально, вы можете сделать следующее, например:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ConsumerFactory<String, Request> consumerFactory(Jackson2ObjectMapperBuilder objectMapperBuilder) {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
StringDeserializer keyDeserializer = new StringDeserializer();
JsonDeserializer<Request> jsonDeserializer = new JsonDeserializer<>(objectMapperBuilder.build());
ConsumerFactory<String, Request> factory = new DefaultKafkaConsumerFactory<>(config, keyDeserializer, jsonDeserializer);
return factory;
}
@Bean
public Jackson2ObjectMapperBuilder objectMapperBuilder() {
Jackson2ObjectMapperBuilder builder = new Jackson2ObjectMapperBuilder();
builder.modulesToInstall(new JavaTimeModule());
return builder;
}
}