Мой конвейер создает Pcollection с уникальными пространствами имен, которые я хочу передать в качестве параметра запроса в ReadFromDatastore, а затем сгладить все чтения.
Похоже, невозможно передать итеративный список пространств имен в качестве побочного ввода
coll = (p
| 'Reading from Datastore' >> ReadFromDatastore(project=PROJECT,query=query,
namespace=beam.pvalue.AsIter(unique_namespaces)
|'Flattening' >> beam.Flatten() )
Подход, который работает, заключается в том, чтобы сохранить PCollection unique_namespaces в виде CSV-файла в хранилище, затем открыть его и для каждой строки сделать следующее:
f = open('unique_namespaces.csv')
unique_namespaces = csv.reader(f)
for namespace in unique_namespaces:
label1 = 'Reading from Datastore: ' + namespace
label2 = 'Flatting: ' + namespace
read = p | label1 >> ReadFromDatastore(project=PROJECT,query=query,
namespace=namespace)
merged = ((merged,read) | label2 >> beam.Flatten())
Вопросы:
1) Что является более элегантным способом чтения из хранилища данных в нескольких пространствах имен?
2) Как обеспечить побочный ввод для преобразования ReadFromDatastore?
3) Как заставить python считывать файл из облачного хранилища ПОСЛЕ ТОГО, КАК он появляется там от записи луча до преобразования текста?
Спасибо!