Kafka Avro сериализует / десериализует в конкретный тип, используя сбой реестра схемы - PullRequest
0 голосов
/ 21 февраля 2020

Кажется, я не могу использовать сообщения в их конкретной реализации avro, я получаю следующее исключение:

class org.apache.avro.generic.GenericData$Record cannot be cast to class my.package.MyConcreteClass

Вот код (я использую Spring Boot)

MyProducer. java

private final KafkaTemplate<String, MyConcreteClass> kafkaTemplate;

public PositionProducer(KafkaTemplate<String, MyConcreteClass> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}

public void sendMessage(MyConcreteClass myConcreteClass) {
    this.kafkaTemplate.send(topic, myConcreteClass);
}

MyConsumer. java

@KafkaListener(topics = "#{'${consumer.topic.name}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void listen(MyConcreteClass incomingMsg) {
    //handle
}

Обратите внимание, что если я изменю все на GenericRecord, десериализация работает должным образом, поэтому я знаю, что все конфигурации (не вставленные) настроены правильно.

Также может быть важно отметить, что я не зарегистрировал схему самостоятельно, и вместо этого позволил своему клиентскому коду сделать это для меня.

Есть идеи?

РЕДАКТИРОВАТЬ:

Конфиг:

@Bean
public ConsumerFactory<String, MyConcreteClass> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    return new DefaultKafkaConsumerFactory<>(props);
}

Ответы [ 2 ]

0 голосов
/ 23 февраля 2020

MyConcreteClass необходимо расширить SpecificRecord

Вы можете использовать плагин Avro maven для генерации его из схемы

Затем необходимо настроить сериализатор так, чтобы он знал, что вы хотите использовать указанные c записи.

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true") ;

0 голосов
/ 21 февраля 2020

Вам необходимо настроить свою потребительскую конфигурацию. ContentDeserializer должен быть KafkaAvroDeserializer со ссылкой на реестр вашей схемы.

...