Apache Beam: как одновременно создать много PCollections, которые подвергаются одному и тому же PTransform? - PullRequest
0 голосов
/ 22 мая 2018

Заранее спасибо!

[+] Проблема:

У меня много файлов в облаке Google, для каждого файла мне нужно:

  1. получитьфайл
  2. Сделайте несколько вызовов API Google-Cloud-Storage для каждого файла, чтобы проиндексировать его (например, name = blob.name, size = blob.size)
  3. распаковать его
  4. поиск материалов там
  5. размещение информации индексации + материалов, найденных внутри файла, в таблице BigQuery

Я использовал python2.7 и Google-Cloud-SDK.Это занимает часов , если я запускаю его линейно.Мне было предложено Apache Beam / DataFlow для параллельной обработки.

[+] Что я смог сделать:

Я могу читать из одного файла, выполнять преобразование PT и записывать в другой файл.

def loadMyFile(pipeline, path):
    return pipeline | "LOAD" >> beam.io.ReadFromText(path)

def myFilter(request):
    return request

with beam.Pipeline(options=PipelineOptions()) as p:
    data = loadMyFile(pipeline,path)
    output = data | "FILTER" >> beam.Filter(myFilter)
    output | "WRITE" >> beam.io.WriteToText(google_cloud_options.staging_location)

[+] Что я хочу сделать:

Как я могу загрузить многие из этих файлов одновременно, выполнить одно и то же преобразование в них параллельно, затем параллельно записать вбольшой запрос?

Схема того, что я хочу выполнить

[+] Что я прочитал:

https://beam.apache.org/documentation/programming-guide/ http://enakai00.hatenablog.com/entry/2016/12/09/104913

Еще раз большое спасибо

Ответы [ 2 ]

0 голосов
/ 30 мая 2018

Хорошо, так что я решил это, выполнив следующее:

1) получить название корзины откуда-то |first PCollection

2) получить список BLOB-объектов из этого списка |вторая PCollection

3) сделать FlatMap, чтобы получить капли индивидуально из списка |третий PCollection

4) сделать ParDo, который получает метаданные

5) записать в BigQuery

, мой конвейер выглядит так:

    with beam.Pipeline(options=options) as pipe:
        bucket      = pipe    | "GetBucketName"            >> beam.io.ReadFromText('gs://example_bucket_eraseme/bucketName.txt')
    listOfBlobs = bucket      | "GetListOfBlobs"           >> beam.ParDo(ExtractBlobs())
    blob        = listOfBlobs | "SplitBlobsIndividually"   >> beam.FlatMap(lambda x: x)
    dic         = blob        | "GetMetaData"              >> beam.ParDo(ExtractMetadata())
    dic                       | "WriteToBigQuery"          >> beam.io.WriteToBigQuery(
0 голосов
/ 22 мая 2018

textio принимает file_pattern .

Из Python sdk:

file_pattern (str) - путь к файлу для чтения в качестве локального пути к файлу или пути GCS gs: //.Путь может содержать символы глобуса

Например, предположим, что у вас есть *.txt файлов в хранилище gs://my-bucket/files/, вы можете сказать:

with beam.Pipeline(options=PipelineOptions()) as p:
  (p 
  | "LOAD" >> beam.io.textio.ReadFromText(file_pattern="gs://my-bucket/files/*.txt")
  | "FILTER" >> beam.Filter(myFilter)
  | "WRITE" >> beam.io.textio.WriteToText(output_ocation)

Если у вас есть несколько PCollections одного и того же типа, вы также можете Свести их в один

merged = (
  (pcoll1, pcoll2, pcoll3)
  # A list of tuples can be "piped" directly into a Flatten transform.
  | beam.Flatten())
...