Как получить все метаданные файлов из PCollection <string>в луче - PullRequest
0 голосов
/ 10 июля 2020

У меня есть плоская коллекция PCollection, которая содержит пути к файлам

PCollection<String> "/this/is/a/123/*.csv , /this/is/a/124/*.csv"
flattenPCollection = pcs.apply(Flatten.<String>pCollections());

Я хочу прочитать каждый файл и получить имя файла и процесс

        flattenPCollection
                .apply("Read CSV files", FileIO.matchAll())
                .apply("Read matching files",FileIO.readMatches())
                .apply("Process each file", ParDo.of(new DoFn<FileIO.ReadableFile, String>() {

                    @ProcessElement
                    public void process(@Element FileIO.ReadableFile file) {
                        // We shloud be able to file and its metadata.
                        logger.info("File Metadata resourceId is {} ", file.getMetadata().resourceId());
                      // here we read each line and process
                    }

                }));

Следующая ошибка происходит

Caused by: java.io.FileNotFoundException: No files matched spec: bob,22,new york

кажется, что конвейер читает первые строки файла csv и ищет эту строку в файловой системе.

Что вызывает это?

Я хочу получить каждый файл как FileIO.ReadableFile

Я уверен, что это что-то очень простое, чего мне не хватает. Любая помощь приветствуется

ОБНОВЛЕНИЕ

Если у вас есть PCollection путей и файлов, вы вручную l oop поверх каждого из них и добавьте ParDo


        for(String path : pathList) {
            pipeline.apply(FileIO.match().filepattern(path))
                    .apply(FileIO.readMatches())
                    .apply(
                            ParDo.of(
                                    new DoFn<FileIO.ReadableFile, String>() {
                                        @ProcessElement
                                        public void process(@Element FileIO.ReadableFile file) throws IOException {
                                            logger.info("Metadata - " + file.getMetadata());
                                            logger.info("File Contents - " + file.readFullyAsUTF8String());
                                            logger.info("File Metadata resourceId is " + file.getMetadata().resourceId());
                                        }
                                    }));
        }

Спасибо кому: @ bigbounty

1 Ответ

0 голосов
/ 10 июля 2020
Pipeline pipeline = Pipeline.create();
pipeline.apply(FileIO.match().filepattern("/Users/bigbounty/Documents/beam/files/*.txt"))
                .apply(FileIO.readMatches())
                .apply(
                        ParDo.of(
                                new DoFn<FileIO.ReadableFile, String>() {
                                    @ProcessElement
                                    public void process(@Element FileIO.ReadableFile file) throws IOException {
                                        LOG.info("Metadata - " + file.getMetadata());
                                        LOG.info("File Contents - " + file.readFullyAsUTF8String());
                                        LOG.info("File Metadata resourceId is " + file.getMetadata().resourceId());
                                    }
                                }));
PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish();

Вывод:

Metadata - Metadata{resourceId=/Users/bigbounty/Document/beam/files/3.txt, sizeBytes=7, isReadSeekEfficient=true, lastModifiedMillis=0}
Metadata - Metadata{resourceId=/Users/bigbounty/Document/beam/files/1.txt, sizeBytes=7, isReadSeekEfficient=true, lastModifiedMillis=0}
Metadata - Metadata{resourceId=/Users/bigbounty/Document/beam/files/2.txt, sizeBytes=7, isReadSeekEfficient=true, lastModifiedMillis=0}
File Contents - hello-1
File Metadata resourceId is /Users/bigbounty/Document/beam/files/1.txt
File Contents - hello-2
File Metadata resourceId is /Users/bigbounty/Document/beam/files/2.txt
File Contents - hello-3
File Metadata resourceId is /Users/bigbounty/Document/beam/files/3.txt
...