Как добавить дополнительное поле к лучу FileIO.matchAll () результат? - PullRequest
0 голосов
/ 23 февраля 2019

У меня есть PCollection KV, где ключом является gcs file_patterns, а значением является некоторая дополнительная информация о файлах (например, системы «Source», которые генерировали файлы).Например,

KV("gs://bucket1/dir1/*", "SourceX"),
KV("gs://bucket1/dir2/*", "SourceY")

Мне нужен PTransferm, чтобы развернуть file_patterns для всех соответствующих файлов в папках GCS и сохранить поле «Источник».Например, если есть два файла X1.dat, X2.dat в dir1 и один файл (Y1.dat) в dir2, вывод будет:

KV("gs://bucket1/dir1/X1.dat", "SourceX"),
KV("gs://bucket1/dir1/X2.dat", "SourceX")
KV("gs://bucket1/dir2/Y1.dat", "SourceY")

Могу ли я использовать FileIO.matchAll () длядостичь этого?Я застрял на том, как объединить / присоединить поле «Источник» к соответствующим файлам.Это то, что я пытался, еще не совсем там:

public PCollection<KV<String, String> expand(PCollection<KV<String, String>> filesAndSources) {
      return filesAndSources
          .apply("Get file names", Keys.create()) 
          .apply(FileIO.matchAll())
          .apply(FileIO.readMatches())
          .apply(ParDo.of(
            new DoFn<ReadableFile, KV<String, String>>() {

              @ProcessElement
              public void processElement(ProcessContext c) {
                 ReadableFile file = c.element();
                 String fileName = file.getMetadata().resourceId().toString();
                 c.output(KV.of(fileName, XXXXX)); // How to get the value field ("Source") from the input KV?

Моя трудность - последняя строка, для XXXXX, как мне получить поле значения ("Источник") из входного KV?Любой способ «соединить» или «объединить» значение входного KV обратно с «затраченными» ключами, так как один ключ (file_pattern) расширяется до нескольких значений.

Спасибо!

1 Ответ

0 голосов
/ 23 февраля 2019

MatchResult.Medata содержит resourceId, который вы уже используете, но не соответствует пути GCS (с подстановочными знаками), которому он соответствует.

Вы можете достичь того, что хотите, используя боковые входы.Чтобы продемонстрировать это, я создал следующее filesAndSources (согласно вашему комментарию это может быть входной параметр, поэтому он не может быть жестко задан в нисходящем направлении):

PCollection<KV<String, String>> filesAndSources = p.apply("Create file pattern and source pairs",
    Create.of(KV.of("gs://" + Bucket + "/sales/*", "Sales"),
              KV.of("gs://" + Bucket + "/events/*", "Events")));

Я материализую это в побочный ввод (в этом случае как Map).Ключом будет шаблон глобуса, преобразованный в регулярное выражение (благодаря этот ответ ), а значением будет исходная строка:

final PCollectionView<Map<String, String>> regexAndSources =
filesAndSources.apply("Glob pattern to RegEx", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    String regex = c.element().getKey();

    StringBuilder out = new StringBuilder("^");
    for(int i = 0; i < regex.length(); ++i) {
        final char ch = regex.charAt(i);
        switch(ch) {
            case '*': out.append(".*"); break;
            case '?': out.append('.'); break;
            case '.': out.append("\\."); break;
            case '\\': out.append("\\\\"); break;
            default: out.append(ch);
        }
    }
    out.append('$');
    c.output(KV.of(out.toString(), c.element().getValue()));
}})).apply("Save as Map", View.asMap());

Затем, после чтения имен файлов, мы можемиспользуйте боковой ввод для анализа каждого пути, чтобы определить, какая пара соответствует шаблону / источнику:

filesAndSources
  .apply("Get file names", Keys.create()) 
  .apply(FileIO.matchAll())
  .apply(FileIO.readMatches())
  .apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        ReadableFile file = c.element();
        String fileName = file.getMetadata().resourceId().toString();

        Set<Map.Entry<String,String>> patternSet = c.sideInput(regexAndSources).entrySet();    

        for (Map.Entry< String,String> pattern:patternSet) 
        { 
            if (fileName.matches(pattern.getKey())) {
              String source = pattern.getValue();
              c.output(KV.of(fileName, source));
            }
        }
     }}).withSideInputs(regexAndSources))

Обратите внимание, что преобразование регулярного выражения выполняется, когда перед вводом бокового ввода вместо того, чтобы избежать дублирования работы.

Вывод, как и ожидалось в моем случае:

Feb 24, 2019 10:44:05 PM org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
INFO: Matched 2 files for pattern gs://REDACTED/events/*
Feb 24, 2019 10:44:05 PM org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
INFO: Matched 2 files for pattern gs://REDACTED/sales/*
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/sales/sales1.csv, value=Sales
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/sales/sales2.csv, value=Sales
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/events/events1.csv, value=Events
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/events/events2.csv, value=Events

Полный код .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...