Использование KafkaAvroDeserializer с Alpakka - PullRequest
0 голосов
/ 31 мая 2019

У меня есть SchemaRegistry и KafkaBroker, из которого я беру данные с Avro v1.8.1. Для десериализации я использовал Confluent KafkaAvroDeserializer . Теперь я намеревался провести рефакторинг своего кода, чтобы использовать Elasticsearch API , предоставленный Alpakka, но, к сожалению, это нарушает десериализацию, поскольку приводит к NullPointerExceptions:

Исключение в потоке "main" org.apache.kafka.common.errors.SerializationException: Ошибка десериализации ключа / значения для раздела-0 раздела со смещением 0. При необходимости, пожалуйста, просмотрите запись, чтобы продолжить потребление. Вызвано: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора 2 Вызывается: java.lang.NullPointerException в io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize (AbstractKafkaAvroDeserializer.java:116) в io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize (AbstractKafkaAvroDeserializer.java:88) на io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize (KafkaAvroDeserializer.java:55) на org.apache.kafka.common.serialization.Deserializer.deserialize (Deserializer.java:58) в org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord (Fetcher.java:1030) в org.apache.kafka.clients.consumer.internals.Fetcher.access $ 3300 (сборщик.java:110) в org.apache.kafka.clients.consumer.internals.Fetcher $ PartitionRecords.fetchRecords (Fetcher.java:1250) в org.apache.kafka.clients.consumer.internals.Fetcher $ PartitionRecords.access $ 1400 (сборщик.java:1099) в org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords (Fetcher.java:545) в org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords (Fetcher.java:506) в org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches (KafkaConsumer.java:1269) в org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1200) в org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1176) по адресу de.adesso.fds.connectors.dpa.news.NewsConsumer.main (MyConsumer.java:58)

Я использовал API ConsumerSettings от Alpakka, как описано в этом примере :

val system = ActorSystem.create();

// necessary to convert timestamps correctly in Avro Version 1.8.1 to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());

val consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new KafkaAvroDeserializer())
    .withBootstrapServers(kafkaBootstrapServerUrl)
    .withClientId(InetAddress.getLocalHost().getHostName())
    .withGroupId("" + new Random().nextInt())
    .withProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
    .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    .withStopTimeout(Duration.ofSeconds(5));

Эти настройки приводят к исключениям NullPointerException, в то время как этот ванильный реквизит Kafka Consumer работает нормально:

val props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); 
props.put(ConsumerConfig.GROUP_ID_CONFIG, "" + new Random().nextInt());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// necessary to convert timestamps correctly in newer Avro Versions and to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
val consumer = new KafkaConsumer<String, MyClass>(props);

В рабочем примере значения ConsumerRecords успешно десериализованы в классы, сгенерированные AvroMavenPlugin из схемы.

Любые советы приветствуются!

1 Ответ

1 голос
/ 04 июня 2019

Я думаю, вам нужно вывести new KafkaAvroDeserializer() к своей собственной переменной, а затем вызвать метод .configure() в этом экземпляре, чтобы передать ненулевой URL-адрес реестра.

Затем передать настроенный экземпляр вConsumerSettings.create

FWIW, в зависимости от ваших потребностей, Kafka Connect прекрасно работает для загрузки Elasticsearch

...