Кафка - десериализация объекта в Consumer - PullRequest
0 голосов
/ 29 июня 2018

Мы планируем использовать Kafka для обмена сообщениями, а наши приложения разрабатываются с использованием Spring. Итак, мы планировали использовать spring-kafka.

Производитель помещает сообщение как объект HashMap в очередь. У нас есть сериализатор JSON, и мы предполагали, что карта будет сериализована и помещена в очередь. А вот и конфиг производителя.

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
        key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

С другой стороны, у нас есть слушатель, который слушает ту же тему, где продюсер опубликовал сообщение. Вот конфиг потребителя:

spring:
   kafka:
       consumer:
            group-id: xyz
            key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
            value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

Наш метод слушателя:

  public void listener(SomeClass abx)

Мы ожидали, что json будет десериализован и будет сгенерирован объект типа «SomeClass». Но, по-видимому, это исключение десериализации.

Мы видели несколько статей, и было предложено сделать что-то вроде:

 @Bean
  public ConsumerFactory<String, Car> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
        new JsonDeserializer<>(Car.class));
  }

Мы не хотим писать код для создания десериализатора. Есть ли что-то, что нам не хватает? Любая помощь будет оценена !!

Ответы [ 2 ]

0 голосов
/ 29 июня 2018

См. загрузочная документация . В частности:

Вы также можете настроить Spring Kafka JsonDeserializer следующим образом:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice

spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

0 голосов
/ 29 июня 2018

Вы можете взглянуть на Confluent: https://www.confluent.io/ В нескольких словах это еще один слой Kafka, который позволяет вам контролировать ваши данные Kafka.

Одной из функций Confluent является сериализация, у меня не было проблем с ее использованием таким образом:

import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);

        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);

        (...)

        return props;
    }

С этими свойствами в Producer и Consumer у вас не должно возникнуть никаких проблем.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...