Список коллекций ветвления и объединения в Apache Луч из общего входа - PullRequest
0 голосов
/ 05 мая 2020

Я создаю конвейер потока данных, и у меня возникают проблемы с ветвлением и объединением выходных данных. Я хочу построить следующий конвейер:

  1. Прочтите некоторые входные данные input_data.
  2. A. Извлеките метри c, metric_1, на input_data. B. Извлеките некоторые другие метри c, metric_2 на input_data
  3. Поскольку эти извлечения метри c требуют больших вычислительных ресурсов, я хочу отделиться от основного input_data и объединить выходные данные потом для дальнейшего расчета. Объединить выходы output.

Вот пример кода, который инкапсулирует мой реальный конвейер

class ReadData(beam.DoFn):
    def process(self, element):
        # read from source
        return [{'input': np.random.rand(100,10)}]


class GetFirstMetric(beam.DoFn):
    def process(self, element):
        # some processing
        return [{'first': np.random.rand(100,4)}]


class GetSecondMetric(beam.DoFn):
    def process(self, element):
        # some processing
        return [{'second': np.random.rand(100,3)}]


def run():
    with beam.Pipeline() as p:
        input_data = (p | 'read sample data' >> beam.ParDo(ReadData()))

        metric_1 = (input_data | 'some metric on input data' >> beam.ParDo(GetFirstMetric()))
        metric_2 = (input_data | 'some aggregate metric' >> beam.ParDo(GetSecondMetric()))

        output = ((metric_1, metric_2) 
                  | beam.Flatten()
                  | beam.combiners.ToList()
                  | beam.Map(print)
        )

Когда я запускаю это, я получаю ошибку 'PBegin' object has no attribute 'windowing'. Я видел несколько примеров и пример кода для выполнения чего-то подобного в Java. Но я не мог найти подходящие ресурсы для того же в Python. У меня следующий вопрос:

  1. Как правильно разветвлять и объединять pcollections (особенно если ветки исходят из общего входа)?

  2. Есть ли лучшая конструкция трубопровода для достижения того же?

Заранее спасибо!

1 Ответ

1 голос
/ 06 мая 2020

В этом коде ваша проблема в том, что вы не «запускаете» начальную коллекцию PCollection. В ReadData.process - каково значение переменной element?

Ну, бегун не может придумать значение, потому что нет входной pcollection. Вам нужно создать свою первую коллекцию PCollection. Вы бы сделали что-то вроде следующего кода ...

Что касается внесения их в список - возможно, стратегия побочного ввода может сработать. CПопробуйте следующее:

def run():
    with beam.Pipeline() as p:
        starter_pcoll = p | beam.Create(['any'])
        input_data = (starter_pcoll | 'read sample data' >> beam.ParDo(ReadData()))

        metric_1 = (input_data | 'some metric on input data' >> beam.ParDo(GetFirstMetric()))
        metric_2 = (input_data | 'some aggregate metric' >> beam.ParDo(GetSecondMetric()))

        side_in = beam.pvalue.AsList((metric_1, metric_2) 
                                     | beam.Flatten())

        p | beam.Create(['any']) | beam.Map(lambda x, si: print(si),
                                            side_in)

Это должно заставить ваш конвейер работать. Рады уточнить ваши c вопросы в дальнейшем.

...