Сериализация авро данных с использованием apache beam (KafkaIO.write) - PullRequest
0 голосов
/ 31 марта 2020

Чтобы опубликовать sh данные в формате avro на kafka topi c:

Я пытаюсь создать обобщенную c запись с использованием AvroCoder и KafkaAvroSerializer. Однако сталкиваются с проблемами. Ниже приведен код и ошибка. Любая помощь будет принята с благодарностью.

   final public static TupleTag<GenericRecord> tag1 = new 
            TupleTag<GenericRecord>() {
    };

     public static final TupleTagList tagList = TupleTagList.of(tag2).and(tag3);

 PCollectionTuple mixedCollection =
                inputStream.apply("check", ParDo.of(new TransformAVRO()).
                        withOutputTags(tag1, tagList));

        Schema schema = new Schema.Parser().parse(
                getClass().getResourceAsStream("/schema.avsc"));
        AvroCoder<GenericRecord> genericCoder = AvroCoder.of(schema);

        PCollection<GenericRecord> testAvro = mixedCollection.
        get(tag2).setCoder(genericCoder);

Ошибка: java .lang.IllegalStateException: Невозможно вернуть кодировщик по умолчанию для aadccheck.out2 [PCollection]. Исправьте одну из следующих root причин: кодер не был указан вручную; Вы можете сделать это, используя .setCoder (). Не удалось получить кодер из CoderRegistry: не удалось предоставить кодер для организации. apache .avro.generi c .GenericRecord. Построить кодер с использованием зарегистрированного CoderProvider не удалось. См. Исключенные исключения для подробных сбоев. Не удалось использовать выходной кодер по умолчанию из производящего PTransform: вызван PTransform.getOutputCoder.

...