Как я могу заставить Kafka Consumer обрабатывать тип LocalDate? - PullRequest
1 голос
/ 02 апреля 2020

Я изменил одно из моих полей с типа string на LocalDate, и теперь мой Kafka Consumer не может его обработать. После прочтения ошибки ниже, похоже, мне нужно десериализовать это значение. Я думал, что изменение consumerFactory ниже для включения LocalDateDeserializer решит эту проблему, но это не так. Нужно ли создавать собственный десериализатор или я могу изменить некоторые настройки в моем consumerFactory?

Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[...125]] from topic [Service.Topic]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `java.time.LocalDate` (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)

Код:

public ConsumerFactory<String, Request> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LocalDateDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

Ответы [ 2 ]

1 голос
/ 03 апреля 2020

Это решило проблему! Я использовал приведенные ниже аннотации при объявлении поля.

@JsonDeserialize(using = LocalDateDeserializer.class)
@JsonSerialize(using = LocalDateSerializer.class)
1 голос
/ 02 апреля 2020

Я думаю, что это не проблема с Кафкой, но с Джексоном.

Кафка использует Джексона по умолчанию для сериализации / десериализации. Вы можете зарегистрировать JavaTimeModule для ObjectMapper и построить JsonDeserializer с ним. Затем создайте свой пользовательский ConsumerFactory:

public ConsumerFactory<String, Request> consumerFactory() {

     ObjectMapper objectMapper = new ObjectMapper();
     objectMapper.registerModule(new JavaTimeModule());

     Map<String, Object> config = new HashMap<>();
     config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

     StringDeserializer keyDeserializer = new StringDeserializer();
     JsonDeserializer<Request> jsonDeserializer = new JsonDeserializer<>(objectMapper);

     ConsumerFactory<String, Request> factory = new DefaultKafkaConsumerFactory<>(config, keyDeserializer,  jsonDeserializer);
     return factory;
}

. Вам нужно будет добавить в свой проект зависимость jackson-datatype-jsr310 , чтобы сделать JavaTimeModule доступным.

И если вы используете spring-boot и хотите настроить ObjectMapper глобально, вы можете сделать следующее, например:

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    public ConsumerFactory<String, Request> consumerFactory(Jackson2ObjectMapperBuilder objectMapperBuilder) {

        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        StringDeserializer keyDeserializer = new StringDeserializer();
        JsonDeserializer<Request> jsonDeserializer = new JsonDeserializer<>(objectMapperBuilder.build());

        ConsumerFactory<String, Request> factory = new DefaultKafkaConsumerFactory<>(config, keyDeserializer,  jsonDeserializer);
        return factory;
    }

    @Bean
    public Jackson2ObjectMapperBuilder objectMapperBuilder() {
        Jackson2ObjectMapperBuilder builder = new Jackson2ObjectMapperBuilder();
        builder.modulesToInstall(new JavaTimeModule());
        return builder;
    }
}

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...