Доступ к информации (метаданным) в имени и типе файла в конвейере Beam - PullRequest
0 голосов
/ 31 января 2019

Мое имя файла содержит информацию, которая мне нужна в моем конвейере, например, идентификатор для моих точек данных является частью имени файла, а не полем в данных.Например, каждая ветряная турбина генерирует файл турбина-loc-001-007.csv.Например, мне нужны данные loc в конвейере.

1 Ответ

0 голосов
/ 31 января 2019

Java (sdk 2.9.0):

Лучи Считыватели TextIO не дают доступа к самому имени файла, для этих случаев использования нам необходимо использовать FileIO для сопоставления файлов и получения доступа к информации.хранится в имени файла.В отличие от TextIO, чтение файла должно осуществляться пользователем в преобразованиях после чтения FileIO.Результатом чтения FileIO является PCollection. Класс ReadableFile содержит имя файла в качестве метаданных, которое может использоваться вместе с содержимым файла.

FileIO имеет удобный метод readFullyAsUTF8String (), который будет считывать весь файл в объект String, сначала он будет считывать весь файл в память.Если проблема связана с памятью, вы можете напрямую работать с файлом с помощью служебных классов, таких как FileSystems.

От: Ссылка на документ

PCollection<KV<String, String>> filesAndContents = p
     .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
     // withCompression can be omitted - by default compression is detected from the filename.
     .apply(FileIO.readMatches().withCompression(GZIP))
     .apply(MapElements
         // uses imports from TypeDescriptors
         .into(KVs(strings(), strings()))
         .via((ReadableFile f) -> KV.of(
             f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String())));

Python (SDK 2.9.0):

Для версии 2.9.0 для python вам потребуется собрать список URI извне конвейера потока данных и передать его в качестве параметра в конвейер.Например, использование FileSystems для чтения в списке файлов с помощью шаблона Glob, а затем передача его в PCollection для обработки.

Как только fileio см. PR https://github.com/apache/beam/pull/7791/, следующий код также будет возможен для Python.

import apache_beam as beam
from apache_beam.io import fileio

with beam.Pipeline() as p:
  readable_files = (p 
                    | fileio.MatchFiles(‘hdfs://path/to/*.txt’)
                    | fileio.ReadMatches()
                    | beam.Reshuffle())
  files_and_contents = (readable_files 
                        | beam.Map(lambda x: (x.metadata.path, 
                                              x.read_utf8()))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...