Я пытаюсь прочитать и применить некоторые подмножества для нескольких файлов в GCP с помощью Apache Beam. Я подготовил два конвейера, которые работают только для одного файла, но не работают, когда я пробую их на нескольких файлах. Кроме того, мне было бы удобно объединить мои конвейеры, если это возможно, или есть способ организовать их так, чтобы они работали по порядку. Теперь конвейеры работают локально, но конечная цель - запустить их с Dataflow.
I textio.ReadFromText и textio.ReadAllFromText, но я не смог заставить ни одну из них работать в случае нескольких файлов.
def toJson(file):
with open(file) as f:
return json.load(f)
with beam.Pipeline(options=PipelineOptions()) as p:
files = (p
| beam.io.textio.ReadFromText("gs://my_bucket/file1.txt.gz", skip_header_lines = 0)
| beam.io.WriteToText("/home/test",
file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| 'read_data' >> beam.Create(['test-00000-of-00001.json'])
| "toJson" >> beam.Map(toJson)
| "takeItems" >> beam.FlatMap(lambda line: line["Items"])
| "takeSubjects" >> beam.FlatMap(lambda line: line['data']['subjects'])
| beam.combiners.Count.PerElement()
| beam.io.WriteToText("/home/items",
file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))
Эти два конвейера хорошо работают для одного файла, но у меня есть сотни файлов в одном формате и я хотел бы использовать преимущества параллельных вычислений.
Есть ли способ заставить этот конвейер работать для нескольких файлов водин и тот же каталог?
Возможно ли сделать это в одном канале вместо создания двух разных конвейеров? (Не удобно записывать файлы на рабочие узлы из корзины.)
Большое спасибо!