Я хочу создать некоторые общие данные в теме kafka, используя apache nifi, и я хочу, чтобы эти данные были в формате avro.Что я для этого сделал:
- Создание новой схемы в реестре схем:
{"type": "record", "name": "my_schema "," namespace ":" my_namespace "," doc ":" "," fields ": [{" name ":" key "," type ":" int "}, {" name ":" value ","type": ["null", "int"]}, {"name": "event_time", "type": "long"}]}
Создание простого конвейера nifi:
![enter image description here](https://i.stack.imgur.com/n647o.png)
Настройки ConvertAvroSchema:
![enter image description here](https://i.stack.imgur.com/04O87.png)
Настройки PublishKafkaRecord:
![enter image description here](https://i.stack.imgur.com/YNFg9.png)
Настройки AvroReader:
![enter image description here](https://i.stack.imgur.com/Eq0jU.png)
Настройки AvroRecordSetWriter:
Затем я пытаюсь прочитать его, используя потоки kafka:
открытый класс Test {private final static static Logger logger =Logger.getLogger (KafkaFilterUsingCacheAvro.class);
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "registry:8081");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, GenericRecord> source = builder.stream("topic");
source.foreach((k, v) -> logger.info(String.format("[%s]: %s", k, v.toString())));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, properties);
streams.start();
}
}
GenericAvroSerde - https://github.com/JohnReedLOL/kafka-streams/blob/master/src/main/java/io/confluent/examples/streams/utils/GenericAvroSerde.java
И в результате я получаю ошибки:
Причина: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора -1. Причина: org.apache.kafka.common.errors.SerializationException: неизвестный магический байт!
Я также пытался явно указать схему avro в avroreader \ writer, но это не помогло.Кроме того, если я пытаюсь просто прочитать байты из темы и преобразовать их в строковое представление, я получу что-то вроде этого:
Objavro.schema {"type": "record", "name":"my_schema", "пространство имен": "my_namespace", "документ": "", "полей": [{ "имя": "ключ", "тип": "ИНТ"}, { "имя": "значение", "type": ["null", "int"]}, {"name": "event_time", "type": "long"}]} avro.codecsnappyÛ4ým [© q ÃàG0 ê¸ä »/} ½ {Ý4ým [© q ÃàG0
Как мне это исправить?