Как читать несколько файлов в Apache Beam из корзины GCP - PullRequest
0 голосов
/ 08 ноября 2019

Я пытаюсь прочитать и применить некоторые подмножества для нескольких файлов в GCP с помощью Apache Beam. Я подготовил два конвейера, которые работают только для одного файла, но не работают, когда я пробую их на нескольких файлах. Кроме того, мне было бы удобно объединить мои конвейеры, если это возможно, или есть способ организовать их так, чтобы они работали по порядку. Теперь конвейеры работают локально, но конечная цель - запустить их с Dataflow.

I textio.ReadFromText и textio.ReadAllFromText, но я не смог заставить ни одну из них работать в случае нескольких файлов.

def toJson(file):
    with open(file) as f:
        return json.load(f)


 with beam.Pipeline(options=PipelineOptions()) as p:
       files = (p
        | beam.io.textio.ReadFromText("gs://my_bucket/file1.txt.gz", skip_header_lines = 0)
        | beam.io.WriteToText("/home/test",
                   file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))

 with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p  
            | 'read_data' >> beam.Create(['test-00000-of-00001.json'])
            | "toJson" >> beam.Map(toJson)
            | "takeItems" >> beam.FlatMap(lambda line: line["Items"])
            | "takeSubjects" >> beam.FlatMap(lambda line: line['data']['subjects'])
            | beam.combiners.Count.PerElement()
            | beam.io.WriteToText("/home/items",
                   file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))

Эти два конвейера хорошо работают для одного файла, но у меня есть сотни файлов в одном формате и я хотел бы использовать преимущества параллельных вычислений.

Есть ли способ заставить этот конвейер работать для нескольких файлов водин и тот же каталог?

Возможно ли сделать это в одном канале вместо создания двух разных конвейеров? (Не удобно записывать файлы на рабочие узлы из корзины.)

Большое спасибо!

1 Ответ

0 голосов
/ 11 ноября 2019

Я решил, как заставить работать несколько файлов, но не смог запустить его в одном конвейере. Я использовал для цикла, а затем луч. Опция Flatten.

Вот мое решение:

file_list = ["gs://my_bucket/file*.txt.gz"]
res_list = ["/home/subject_test_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]

with beam.Pipeline(options=PipelineOptions()) as p:
    for i,file in enumerate(file_list):
       (p 
        | "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file, skip_header_lines = 0)
        | "Write TExt {}".format(i) >> beam.io.WriteToText("/home/subject_test_{}".format(i),
                   file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))

pcols = []
with beam.Pipeline(options=PipelineOptions()) as p:
   for i,res in enumerate(res_list):
         pcol = (p   | 'read_data_{}'.format(i) >> beam.Create([res])
            | "toJson_{}".format(i) >> beam.Map(toJson)
            | "takeItems_{}".format(i) >> beam.FlatMap(lambda line: line["Items"])
            | "takeSubjects_{}".format(i) >> beam.FlatMap(lambda line: line['data']['subjects']))
        pcols.append(pcol)
   out = (pcols
    | beam.Flatten()
    | beam.combiners.Count.PerElement()
    | beam.io.WriteToText("/home/items",
                   file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))
...