Apache Beam TextIO.ReadAll Как генерировать KeyValue вместо String of Pcollection - PullRequest
0 голосов
/ 21 декабря 2018

Трубопровод начинается с чтения из PUBSUBIo.Сообщение внутри PubSub IO - это путь к файлу GCS.Я знаю, что могу использовать ReadAll() для вывода строк из каждого пути.Однако это не соответствует моей цели ( Информация о пути к файлу потеряна ).Что мне нужно, это испустить KV<'Filepath','Lines inside files'>.

Сообщения PubSUB будут выглядеть так:

Message1 -> gs://folder1/Topic1/topicfile1.gz
Message2 -> gs://folder1/Topic2/topicfile2.gz

Предположим, что содержимое файла примерно так:

topicfile1.gz
{
topic1.line1
topic1.line2
}

topicfile2.gz
{
topic2.line1
topic2.line2
}

ЧтоЯ ожидаю, что это pcollection, как показано ниже

{KV<'gs://folder1/Topic1/topicfile1.gz','topic1.line1'>}
{KV<'gs://folder1/Topic1/topicfile1.gz','topic1.line2'>}
{KV<'gs://folder1/Topic2/topicfile2.gz','topic2.line1'>}
{KV<'gs://folder1/Topic2/topicfile2.gz','topic2.line2'>}

Я не смог найти способ прочитать файл из пути внутри функции ParDo, чтобы сопоставить путь с линиями.

Надеюсь, это понятно.

1 Ответ

0 голосов
/ 28 декабря 2018

Я не думаю, что это поддерживается в TextIO из коробки, если я правильно понял вопрос.

Подробности

Когда вы применяете преобразования какreadAll() Существует несколько шагов между получением начальных путей к файлам из IO и выводом всех строк из всех файлов в конце.

Например, логика в TextIO:

  • принимает PCollection путей к файлам (или шаблонов путей);
  • применяет FileIO.matchAll(), который преобразует PCollection шаблонов путей в PCollection из MatchResult.Metadata объектов, описывающих эти пути;
  • , затем применяется FileIO.readMatches(), который преобразует объекты метаданных в ReadableFile объекты, описывающие конкретные файлы;
  • и, наконец, применяетсяTextIO.readFiles(), который принимает ReadableFile и выводит все строки из этого файла;
    • на этом последнем шаге вы захотите добавить путь к файлу к выводу, чтобы вы знали, какая строка происходит из какого файла.Что могло бы помочь, если бы была возможность изменить последний шаг для выдачи KV<ReadableFile, String> вместо просто строк, чтобы вы могли получить доступ к пути к файлу, используя ReadableFile.metadata.

Обращаясь к этому коду, кажется, что выделение необработанных строк из файлов является единственным поддерживаемым способом выполнения действий с использованием TextIO прямо сейчас.

Обходные пути

Возможносамый простой способ - написать свой PTransform, аналогичный TextIO.ReadAll.Это будет работать примерно так:

Высокий уровень:

  • Создайте и настройте собственную версию TextIO.ReadAll;
  • И из ReadAllViaFileBasedSource;
  • Измените свою версию ReadAllViaFileBasedSource, чтобы выдавать то, что вы хотите;
  • Используйте эту пользовательскую версию TextIO.ReadAll, которая использует вашу пользовательскую версию ReadAllViaFileBasedSource, которая выдает правильные вещи;

Чуть более подробно:

...