Поток Kafka, использующий SpecificAvroSerde в Java Не удалось найти ошибку класса - PullRequest
0 голосов
/ 18 мая 2018

Я готовлю приложение Kafka Stream с Java.Задача использует некоторые сообщения из темы, которая содержит сообщение, сгенерированное соединителем JDBC с

io.confluent.connect.avro.AvroConverter в качестве преобразователя ключей и значений.

Сообщения можно использовать с помощью bin/kafka-avro-console-consumer в командном режиме без каких-либо проблем.

Я сгенерировал класс (с именем myobject) в моей среде IDE с помощью подключаемого модуля Maven, и сгенерированный класс находится под

{project}/target/generated-sourcess/avro/com.myproject.myclasses/myobject

В моем основном классеЯ могу создать объект с помощью класса myobject без каких-либо проблем.

Я импортировал этот сгенерированный класс в мой основной класс

import com.myproject.myclasses.myobject;

У меня есть следующие конфигурации в моем потоковом приложении

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId);
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers());
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");       
    config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig());       
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
    config.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

В моем скрипте Stream Builder у меня есть

    final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url","http://localhost:8081");
    Serdes.StringSerde keySpecificAvroSerde = new Serdes.StringSerde();
    final Serde<myObject> valueSpecificAvroSerde = new SpecificAvroSerde<>();
    valueSpecificAvroSerde.configure(serdeConfig, false); // `false` for record values

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, myObject> purchaseInvoices = builder.stream(this.sourceTopic
            , Consumed.with(Serdes.String(), valueSpecificAvroSerde));

Я получил следующую ошибку ниже:

Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class mybject specified in writer's schema whilst finding reader's schema for a SpecificRecord.

Согласно моему тестированию, я считаю, что приложение может извлечь запись из моей темы.Более того, когда я использую подход GenericAvroSerde, все работает отлично, и я могу успешно использовать сообщение.Однако подход GenericAvroSerde должен иметь дополнительные строки для сопоставления сообщения из темы Kafka в myObject, и я предполагаю, что подход SpecificAvroSerde может сохранить эту часть отображения, так что я действительно хочу, чтобы подход SpecificAvroSerde работал.

Есть совет?Является ли это, что я поместил myObject в неправильное место, чтобы мое приложение (я просто запускаю в своей локальной IDE с Confluent на той же машине) не смог найти myObject?

Заранее спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...