Как создать сообщение Avro в теме kafka из apache nifi, а затем прочитать его с помощью потоков kafka? - PullRequest
0 голосов
/ 09 октября 2018

Я хочу создать некоторые общие данные в теме kafka, используя apache nifi, и я хочу, чтобы эти данные были в формате avro.Что я для этого сделал:

  1. Создание новой схемы в реестре схем:

{"type": "record", "name": "my_schema "," namespace ":" my_namespace "," doc ":" "," fields ": [{" name ":" key "," type ":" int "}, {" name ":" value ","type": ["null", "int"]}, {"name": "event_time", "type": "long"}]}

Создание простого конвейера nifi: enter image description here Настройки ConvertAvroSchema: enter image description here Настройки PublishKafkaRecord: enter image description here Настройки AvroReader: enter image description here Настройки AvroRecordSetWriter: enter image description here

Затем я пытаюсь прочитать его, используя потоки kafka:

открытый класс Test {private final static static Logger logger =Logger.getLogger (KafkaFilterUsingCacheAvro.class);

public static void main(String[] args) {
    Properties properties = new Properties();

    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
    properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "registry:8081");

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, GenericRecord> source = builder.stream("topic");
    source.foreach((k, v) -> logger.info(String.format("[%s]: %s", k, v.toString())));

    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, properties);
    streams.start();
}

}

GenericAvroSerde - https://github.com/JohnReedLOL/kafka-streams/blob/master/src/main/java/io/confluent/examples/streams/utils/GenericAvroSerde.java

И в результате я получаю ошибки:

Причина: org.apache.kafka.common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора -1. Причина: org.apache.kafka.common.errors.SerializationException: неизвестный магический байт!

Я также пытался явно указать схему avro в avroreader \ writer, но это не помогло.Кроме того, если я пытаюсь просто прочитать байты из темы и преобразовать их в строковое представление, я получу что-то вроде этого:

Objavro.schema {"type": "record", "name":"my_schema", "пространство имен": "my_namespace", "документ": "", "полей": [{ "имя": "ключ", "тип": "ИНТ"}, { "имя": "значение", "type": ["null", "int"]}, {"name": "event_time", "type": "long"}]} avro.codecsnappyÛ4ým [© q ÃàG0 ê¸ä »/} ½ {Ý4ým [© q ÃàG0

Как мне это исправить?

1 Ответ

0 голосов
/ 09 октября 2018

В процессоре PublishKafka ваш писатель Avro настроен со «Стратегией записи схемы» из «Встроенной схемы Avro».Это означает, что сообщения, записываемые в Kafka, являются стандартными сообщениями Avro с полной внедренной схемой.

На стороне потребителя (потоки Kafka) похоже, что он ожидает использовать реестр слитой схемы, и в этом случае этоне ожидая встроенной схемы Avro, она ожидает специальной последовательности байтов, указывающей идентификатор схемы, за которой следует простое сообщение Avro.

Если вы хотите, чтобы ваш потребитель оставался без изменений, то на стороне NiFi вы будетехотите изменить «Стратегию записи схемы» вашего Avro Writer на «Справочник реестра схемы слияния».Я думаю, что для этого также может потребоваться изменить читатель Avro для доступа к схеме с использованием службы реестра Confluent Schema.

В качестве альтернативы, возможно, есть способ заставить Kafka Streams читать встроенную схему и не использовать схему Confluent.реестра, но я раньше не использовал Kafka Streams, поэтому не могу сказать, возможно ли это.

...