MatchResult.Medata
содержит resourceId
, который вы уже используете, но не соответствует пути GCS (с подстановочными знаками), которому он соответствует.
Вы можете достичь того, что хотите, используя боковые входы.Чтобы продемонстрировать это, я создал следующее filesAndSources
(согласно вашему комментарию это может быть входной параметр, поэтому он не может быть жестко задан в нисходящем направлении):
PCollection<KV<String, String>> filesAndSources = p.apply("Create file pattern and source pairs",
Create.of(KV.of("gs://" + Bucket + "/sales/*", "Sales"),
KV.of("gs://" + Bucket + "/events/*", "Events")));
Я материализую это в побочный ввод (в этом случае как Map
).Ключом будет шаблон глобуса, преобразованный в регулярное выражение (благодаря этот ответ ), а значением будет исходная строка:
final PCollectionView<Map<String, String>> regexAndSources =
filesAndSources.apply("Glob pattern to RegEx", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String regex = c.element().getKey();
StringBuilder out = new StringBuilder("^");
for(int i = 0; i < regex.length(); ++i) {
final char ch = regex.charAt(i);
switch(ch) {
case '*': out.append(".*"); break;
case '?': out.append('.'); break;
case '.': out.append("\\."); break;
case '\\': out.append("\\\\"); break;
default: out.append(ch);
}
}
out.append('$');
c.output(KV.of(out.toString(), c.element().getValue()));
}})).apply("Save as Map", View.asMap());
Затем, после чтения имен файлов, мы можемиспользуйте боковой ввод для анализа каждого пути, чтобы определить, какая пара соответствует шаблону / источнику:
filesAndSources
.apply("Get file names", Keys.create())
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
ReadableFile file = c.element();
String fileName = file.getMetadata().resourceId().toString();
Set<Map.Entry<String,String>> patternSet = c.sideInput(regexAndSources).entrySet();
for (Map.Entry< String,String> pattern:patternSet)
{
if (fileName.matches(pattern.getKey())) {
String source = pattern.getValue();
c.output(KV.of(fileName, source));
}
}
}}).withSideInputs(regexAndSources))
Обратите внимание, что преобразование регулярного выражения выполняется, когда перед вводом бокового ввода вместо того, чтобы избежать дублирования работы.
Вывод, как и ожидалось в моем случае:
Feb 24, 2019 10:44:05 PM org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
INFO: Matched 2 files for pattern gs://REDACTED/events/*
Feb 24, 2019 10:44:05 PM org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
INFO: Matched 2 files for pattern gs://REDACTED/sales/*
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/sales/sales1.csv, value=Sales
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/sales/sales2.csv, value=Sales
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/events/events1.csv, value=Events
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/events/events2.csv, value=Events
Полный код .