У меня есть SchemaRegistry и KafkaBroker, из которого я беру данные с Avro v1.8.1. Для десериализации я использовал Confluent KafkaAvroDeserializer . Теперь я намеревался провести рефакторинг своего кода, чтобы использовать Elasticsearch API , предоставленный Alpakka, но, к сожалению, это нарушает десериализацию, поскольку приводит к NullPointerExceptions:
Исключение в потоке "main" org.apache.kafka.common.errors.SerializationException: Ошибка десериализации ключа / значения для раздела-0 раздела со смещением 0. При необходимости, пожалуйста, просмотрите запись, чтобы продолжить потребление.
Вызвано: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора 2
Вызывается: java.lang.NullPointerException
в io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize (AbstractKafkaAvroDeserializer.java:116)
в io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize (AbstractKafkaAvroDeserializer.java:88)
на io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize (KafkaAvroDeserializer.java:55)
на org.apache.kafka.common.serialization.Deserializer.deserialize (Deserializer.java:58)
в org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord (Fetcher.java:1030)
в org.apache.kafka.clients.consumer.internals.Fetcher.access $ 3300 (сборщик.java:110)
в org.apache.kafka.clients.consumer.internals.Fetcher $ PartitionRecords.fetchRecords (Fetcher.java:1250)
в org.apache.kafka.clients.consumer.internals.Fetcher $ PartitionRecords.access $ 1400 (сборщик.java:1099)
в org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords (Fetcher.java:545)
в org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords (Fetcher.java:506)
в org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches (KafkaConsumer.java:1269)
в org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1200)
в org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1176)
по адресу de.adesso.fds.connectors.dpa.news.NewsConsumer.main (MyConsumer.java:58)
Я использовал API ConsumerSettings от Alpakka, как описано в этом примере :
val system = ActorSystem.create();
// necessary to convert timestamps correctly in Avro Version 1.8.1 to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
val consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new KafkaAvroDeserializer())
.withBootstrapServers(kafkaBootstrapServerUrl)
.withClientId(InetAddress.getLocalHost().getHostName())
.withGroupId("" + new Random().nextInt())
.withProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
.withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withStopTimeout(Duration.ofSeconds(5));
Эти настройки приводят к исключениям NullPointerException, в то время как этот ванильный реквизит Kafka Consumer работает нормально:
val props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "" + new Random().nextInt());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// necessary to convert timestamps correctly in newer Avro Versions and to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
val consumer = new KafkaConsumer<String, MyClass>(props);
В рабочем примере значения ConsumerRecords успешно десериализованы в классы, сгенерированные AvroMavenPlugin из схемы.
Любые советы приветствуются!