В Apache Beam, как мне получить доступ к исходному имени файла при использовании AvroIO.parseAllGenericRecords? - PullRequest
0 голосов
/ 09 мая 2019

Я пытаюсь использовать Apache Beam для анализа списка файлов Avro. Как часть анализа, я хотел бы включить имя исходного файла вместе с записью, которая была проанализирована из файла Avro. Я использую AvroIO.parseAllGenericRecords для этого, но я не вижу способа получить доступ к исходному имени файла.

Я смотрел на подобные вопросы в SO, но они не относятся к AvroIO.parseAllGenericRecords. В частности, о AvroIO.parseAllGenericRecords мне нужно выполнить преобразование внутри SerializableFunction, у которого нет доступа к ProcessContext, что многие из связанных решений предлагают для доступа к исходному имени файла.

Pipeline p = ...;
PCollection<String> filepatterns = p.apply(...);
PCollection<Foo> records =
    filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
        public Foo apply(GenericRecord record) {
            Foo foo = new Foo();
            // foo.setSourceFilename(); // set source filename for the object
            foo.setField1(record.get("field1"));
            foo.setField2(record.get("field2"));
            return foo;
        }
}));
...