Я пытаюсь использовать Apache Beam для анализа списка файлов Avro. Как часть анализа, я хотел бы включить имя исходного файла вместе с записью, которая была проанализирована из файла Avro. Я использую AvroIO.parseAllGenericRecords
для этого, но я не вижу способа получить доступ к исходному имени файла.
Я смотрел на подобные вопросы в SO, но они не относятся к AvroIO.parseAllGenericRecords
. В частности, о AvroIO.parseAllGenericRecords
мне нужно выполнить преобразование внутри SerializableFunction
, у которого нет доступа к ProcessContext
, что многие из связанных решений предлагают для доступа к исходному имени файла.
Pipeline p = ...;
PCollection<String> filepatterns = p.apply(...);
PCollection<Foo> records =
filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
public Foo apply(GenericRecord record) {
Foo foo = new Foo();
// foo.setSourceFilename(); // set source filename for the object
foo.setField1(record.get("field1"));
foo.setField2(record.get("field2"));
return foo;
}
}));