Получение исключения при чтении из темы kafka: вызвано: org.apache.kafka.common.errors.SerializationException: ошибка десериализации сообщения Avro для идентификатора 1, вызванного: org.apache.kafka.common.errors.SerializationException: Не удалось найти класс USERS, указанный в схеме писателя, при поиске схемы считывателя для SpecificRecord.
Я думаю, что десериализация не верна, и я также не могу найти подходящий пример.
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG , "TEST");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
StreamsBuilder builder = new StreamsBuilder();
KStream<String,USERS> valid = builder.stream("testUSERS");
valid.foreach((k,v)-> System.out.println("v ="+v.getUSERNAME()));
valid.foreach((k,v)-> System.out.println("k ="+k));
KafkaStreams streams = new KafkaStreams(builder.build(),props);
streams.cleanUp();
streams.start();