Поток данных / Apache - как получить доступ к текущему имени файла при передаче в шаблоне? - PullRequest
0 голосов
/ 21 ноября 2018

Я уже видел ответ на этот вопрос ранее при переполнении стека (https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow),, но не с тех пор, как в apache beam добавлена ​​функция splittable dofn для python. Как получить доступ к имени файла текущего файла, обрабатываемого при передаче в файлшаблон в корзину gcs?

Я хочу передать имя файла в мою функцию преобразования:

with beam.Pipeline(options=pipeline_options) as p:                              
    lines = p | ReadFromText('gs://url to file')                                        


    data = (                                                                    
        lines                                                                   
        | 'Jsonify' >> beam.Map(jsonify)                                        
        | 'Unnest' >> beam.FlatMap(unnest)                                      
        | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(                  
            'project_id:dataset_id.table_name', schema=schema,                     
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,    
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)       
        )                                                   

В конечном итоге, я хочу передать имя файла в мою функцию преобразования при преобразованиикаждая строка json (см. this , а затем используйте имя файла, чтобы выполнить поиск в другой таблице BQ, чтобы получить значение). Я думаю, что как только мне удастся узнать, как получить имя файла, я смогувыяснить боковую часть ввода, чтобы выполнить поиск в таблице bq и получить уникальное значение.

1 Ответ

0 голосов
/ 24 ноября 2018

Я пытался реализовать решение с ранее приведенным случаем .Там, как и в других подходах, таких как , этот , они также получают список имен файлов, но загружают весь файл в один элемент, который может плохо масштабироваться с большими файлами.Поэтому я рассмотрел добавление имени файла к каждой записи.

В качестве ввода я использовал два файла csv:

$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain

gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france

Используя GCSFileSystem.match, мы можем получить доступ к metadata_list, чтобы получить FileMetadata, содержащуюпуть к файлу и размер в байтах.В моем примере:

[FileMetadata(gs://BUCKET_NAME/countries1.csv, 29),
 FileMetadata(gs://BUCKET_NAME/countries2.csv, 29)]

Код:

result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]

Мы будем читать каждый из соответствующих файлов в другой PCollection.Поскольку мы не знаем количество файлов априори, нам нужно программно создать список имен для каждой коллекции PCollection (p0, p1, ..., pN-1) и обеспечить наличие уникальных меток для каждого шага ('Read file 0', 'Read file 1', etc.):

variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

Затем мы переходим к чтению каждого отдельного файла в соответствующую ему PCollection с ReadFromText, а затем мы вызываем AddFilenamesFn ParDo, чтобы связать каждую запись с именем файла.

for i in range(len(result)):   
  globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

, где AddFilenamesFn:

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with filename and row"""
    def process(self, element, file_path):
        file_name = file_path.split("/")[-1]
        yield {'filename':file_name, 'row':element}

Моим первым подходом было использование функции Map напрямую, что привело к упрощению кода.Тем не менее, result[i].path был разрешен в конце цикла, и каждая запись была неправильно сопоставлена ​​с последним файлом списка:

globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))

Наконец, мы объединяем все PCollections в один:

merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten()

и мы проверяем результаты, регистрируя элементы:

INFO:root:{'filename': u'countries2.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries2.csv', 'row': u'3,italy'}
INFO:root:{'filename': u'countries2.csv', 'row': u'4,france'}
INFO:root:{'filename': u'countries1.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries1.csv', 'row': u'1,sweden'}
INFO:root:{'filename': u'countries1.csv', 'row': u'2,spain'}

Я проверил это с DirectRunner и DataflowRunner для Python SDK 2.8.0.

IНадеемся, что это решит основную проблему здесь, и вы можете продолжить, интегрировав BigQuery в ваш полный сценарий использования.Возможно, вам придется использовать клиентскую библиотеку Python для этого, я написал похожий Java пример .

Полный код:

import argparse, logging
from operator import add

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem

class GCSFileReader:
  """Helper class to read gcs files"""
  def __init__(self, gcs):
      self.gcs = gcs

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with filename and row"""
    def process(self, element, file_path):
        file_name = file_path.split("/")[-1]
        # yield (file_name, element) # use this to return a tuple instead
        yield {'filename':file_name, 'row':element}

# just logging output to visualize results
def write_res(element):
  logging.info(element)
  return element

def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  p = beam.Pipeline(options=PipelineOptions(pipeline_args))
  gcs = GCSFileSystem(PipelineOptions(pipeline_args))
  gcs_reader = GCSFileReader(gcs)

  # in my case I am looking for files that start with 'countries'
  BUCKET='BUCKET_NAME'
  result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
  result = reduce(add, result)

  # create each input PCollection name and unique step labels
  variables = ['p{}'.format(i) for i in range(len(result))]
  read_labels = ['Read file {}'.format(i) for i in range(len(result))]
  add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

  # load each input file into a separate PCollection and add filename to each row
  for i in range(len(result)):
    # globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
    globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

  # flatten all PCollections into a single one
  merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)

  p.run()

if __name__ == '__main__':
  run()
...