У меня есть плоская коллекция PCollection, которая содержит пути к файлам
PCollection<String> "/this/is/a/123/*.csv , /this/is/a/124/*.csv"
flattenPCollection = pcs.apply(Flatten.<String>pCollections());
Я хочу прочитать каждый файл и получить имя файла и процесс
flattenPCollection
.apply("Read CSV files", FileIO.matchAll())
.apply("Read matching files",FileIO.readMatches())
.apply("Process each file", ParDo.of(new DoFn<FileIO.ReadableFile, String>() {
@ProcessElement
public void process(@Element FileIO.ReadableFile file) {
// We shloud be able to file and its metadata.
logger.info("File Metadata resourceId is {} ", file.getMetadata().resourceId());
// here we read each line and process
}
}));
Следующая ошибка происходит
Caused by: java.io.FileNotFoundException: No files matched spec: bob,22,new york
кажется, что конвейер читает первые строки файла csv и ищет эту строку в файловой системе.
Что вызывает это?
Я хочу получить каждый файл как FileIO.ReadableFile
Я уверен, что это что-то очень простое, чего мне не хватает. Любая помощь приветствуется
ОБНОВЛЕНИЕ
Если у вас есть PCollection путей и файлов, вы вручную l oop поверх каждого из них и добавьте ParDo
for(String path : pathList) {
pipeline.apply(FileIO.match().filepattern(path))
.apply(FileIO.readMatches())
.apply(
ParDo.of(
new DoFn<FileIO.ReadableFile, String>() {
@ProcessElement
public void process(@Element FileIO.ReadableFile file) throws IOException {
logger.info("Metadata - " + file.getMetadata());
logger.info("File Contents - " + file.readFullyAsUTF8String());
logger.info("File Metadata resourceId is " + file.getMetadata().resourceId());
}
}));
}
Спасибо кому: @ bigbounty