Я создаю конвейер потока данных, и у меня возникают проблемы с ветвлением и объединением выходных данных. Я хочу построить следующий конвейер:
- Прочтите некоторые входные данные
input_data
. - A. Извлеките метри c,
metric_1
, на input_data
. B. Извлеките некоторые другие метри c, metric_2
на input_data
- Поскольку эти извлечения метри c требуют больших вычислительных ресурсов, я хочу отделиться от основного
input_data
и объединить выходные данные потом для дальнейшего расчета. Объединить выходы output
.
Вот пример кода, который инкапсулирует мой реальный конвейер
class ReadData(beam.DoFn):
def process(self, element):
# read from source
return [{'input': np.random.rand(100,10)}]
class GetFirstMetric(beam.DoFn):
def process(self, element):
# some processing
return [{'first': np.random.rand(100,4)}]
class GetSecondMetric(beam.DoFn):
def process(self, element):
# some processing
return [{'second': np.random.rand(100,3)}]
def run():
with beam.Pipeline() as p:
input_data = (p | 'read sample data' >> beam.ParDo(ReadData()))
metric_1 = (input_data | 'some metric on input data' >> beam.ParDo(GetFirstMetric()))
metric_2 = (input_data | 'some aggregate metric' >> beam.ParDo(GetSecondMetric()))
output = ((metric_1, metric_2)
| beam.Flatten()
| beam.combiners.ToList()
| beam.Map(print)
)
Когда я запускаю это, я получаю ошибку 'PBegin' object has no attribute 'windowing'
. Я видел несколько примеров и пример кода для выполнения чего-то подобного в Java. Но я не мог найти подходящие ресурсы для того же в Python. У меня следующий вопрос:
Как правильно разветвлять и объединять pcollections (особенно если ветки исходят из общего входа)?
Есть ли лучшая конструкция трубопровода для достижения того же?
Заранее спасибо!