Я готовлю приложение Kafka Stream с Java.Задача использует некоторые сообщения из темы, которая содержит сообщение, сгенерированное соединителем JDBC с
io.confluent.connect.avro.AvroConverter
в качестве преобразователя ключей и значений.
Сообщения можно использовать с помощью bin/kafka-avro-console-consumer
в командном режиме без каких-либо проблем.
Я сгенерировал класс (с именем myobject) в моей среде IDE с помощью подключаемого модуля Maven, и сгенерированный класс находится под
{project}/target/generated-sourcess/avro/com.myproject.myclasses/myobject
В моем основном классеЯ могу создать объект с помощью класса myobject без каких-либо проблем.
Я импортировал этот сгенерированный класс в мой основной класс
import com.myproject.myclasses.myobject;
У меня есть следующие конфигурации в моем потоковом приложении
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig());
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
config.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
В моем скрипте Stream Builder у меня есть
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url","http://localhost:8081");
Serdes.StringSerde keySpecificAvroSerde = new Serdes.StringSerde();
final Serde<myObject> valueSpecificAvroSerde = new SpecificAvroSerde<>();
valueSpecificAvroSerde.configure(serdeConfig, false); // `false` for record values
StreamsBuilder builder = new StreamsBuilder();
KStream<String, myObject> purchaseInvoices = builder.stream(this.sourceTopic
, Consumed.with(Serdes.String(), valueSpecificAvroSerde));
Я получил следующую ошибку ниже:
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class mybject specified in writer's schema whilst finding reader's schema for a SpecificRecord.
Согласно моему тестированию, я считаю, что приложение может извлечь запись из моей темы.Более того, когда я использую подход GenericAvroSerde, все работает отлично, и я могу успешно использовать сообщение.Однако подход GenericAvroSerde должен иметь дополнительные строки для сопоставления сообщения из темы Kafka в myObject, и я предполагаю, что подход SpecificAvroSerde может сохранить эту часть отображения, так что я действительно хочу, чтобы подход SpecificAvroSerde работал.
Есть совет?Является ли это, что я поместил myObject в неправильное место, чтобы мое приложение (я просто запускаю в своей локальной IDE с Confluent на той же машине) не смог найти myObject?
Заранее спасибо.