Ошибка записи луча в файл avro - PullRequest
0 голосов
/ 25 апреля 2019

Я последовал примеру записи файла AVRO в Документация по Beam . Но это дает мне ошибку Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema на p.run().waitUntilFinish() шаге. Однако, если я читаю из файла AVRO и записываю его в другой вывод AVRO, он работает нормально. Моя цель - написать файл AVRO из любого произвольного источника ввода. Кто-нибудь видел подобные проблемы? Как ты это решил?

public class WriteAvro {

public interface CsvToAvroOptions extends PipelineOptions {

    @Description("Path of the file to read from")
    @Default.String("test.avro")
    String getInputFile();

    void setInputFile(String value);
}

static void run(CsvToAvroOptions options) throws IOException {
    final Schema schema = new Schema.Parser().parse(Resources.getResource("person.avsc").openStream());
    Pipeline p = Pipeline.create(options);
    // This works fine
    // PCollection<GenericRecord> input = p.apply(AvroIO.readGenericRecords(schema).from(options.getInputFile()));

    // This doesn't work
    PCollection<GenericRecord> input =
            p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
                    .apply(ParDo.of(new DoFn<String, GenericRecord>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) {
                            GenericRecord record = new GenericData.Record(schema);
                            record.put("name", "John Doe");
                            record.put("age", 42);
                            record.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
                            c.output(record);
                        }
                    }))
                    .setCoder(AvroCoder.of(GenericRecord.class, schema));

    input.apply(AvroIO.writeGenericRecords(schema).to("prefix"));
    p.run().waitUntilFinish();
}


public static void main(String[] args) throws IOException {
    CsvToAvroOptions options =
            PipelineOptionsFactory.fromArgs(args).withValidation().as(CsvToAvroOptions.class);

    run(options);
}
}
  • Версия луча: 2.11.0
  • Бегун: прямой бегун

1 Ответ

1 голос
/ 25 апреля 2019

Schema не сериализуемо, что вызывает эту ошибку.Вы можете сохранить схему в виде текста и проанализировать ее во время настройки DoFn.

Вот как вы можете это сделать.


public interface CsvToAvroOptions extends PipelineOptions {

    @Description("Path of the file to read from")
    @Default.String("test.avro")
    String getInputFile();

    void setInputFile(String value);
}



  private static class ConstructAvroRecordsFn extends DoFn<String, GenericRecord> {

    private final String schemaJson;
    private Schema schema;

    ConstructAvroRecordsFn(Schema schema){
      schemaJson = schema.toString();
    }

    @Setup
    public void setup(){
      schema = new Schema.Parser().parse(schemaJson);
    }
    @ProcessElement
    public void processElement(ProcessContext c) {
      GenericRecord record = new GenericData.Record(schema);
      record.put("name", "John Doe");
      record.put("age", 42);
      record.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
      c.output(record);
    }
  }

static void run(CsvToAvroOptions options) throws IOException {
    final Schema schema = new Schema.Parser().parse(Resources.getResource("person.avsc").openStream());
  Pipeline p = Pipeline.create(options);
    // This works fine
    // PCollection<GenericRecord> input = p.apply(AvroIO.readGenericRecords(schema).from(options.getInputFile()));

    // This doesn't work
    PCollection<GenericRecord> input =
            p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
                    .apply(ParDo.of(new ConstructAvroRecordsFn(schema)))
                    .setCoder(AvroCoder.of(GenericRecord.class, schema));

    input.apply(AvroIO.writeGenericRecords(schema).to("prefix"));
    p.run().waitUntilFinish();
}


public static void main(String[] args) throws IOException {
    CsvToAvroOptions options =
            PipelineOptionsFactory.fromArgs(args).withValidation().as(CsvToAvroOptions.class);

    run(options);
}
}
...