Как распараллелить группы доступности базы данных apache-beam (Dataflow) - PullRequest
0 голосов
/ 02 октября 2018

Я использую apache-beam 2.5.0 python SDK

Прикрепляя фрагмент кода в конвейере, я беру i / p из раздела pubsub, разбираю его и хочу выполнить какую-то операцию, когдаЯ запустил его с DataflowRunner, он работает нормально, но кажется, что «обработка данных fun1», «обработка данных fun2» «обработка данных fun3» выполняются последовательно, мне нужно, чтобы он работал параллельно.Я новичок в потоке данных.

Есть ли способ распараллелить его?

def run():
   parser = argparse.ArgumentParser()
   args, pipeline_args = parser.parse_known_args()
   options = PipelineOptions(pipeline_args)

   with beam.Pipeline(options=options) as p:
       data = (p | "Read Pubsub Messages" >> 
            beam.io.ReadFromPubSub(topic=config.pub_sub_topic)
              | "Parse messages " >> beam.Map(parse_pub_sub_message_with_bq_data)
            )

       data | "data processing fun1 " >> beam.ParDo(Fun1())
       data | "data processing fun2" >> beam.ParDo(Fun2())
       data | "data processing fun3" >> beam.ParDo(Fun3())
if __name__ == '__main__':
   run()

1 Ответ

0 голосов
/ 02 октября 2018

Зачем вам нужно, чтобы эти функции запускались одновременно?

Beam / Dataflow возьмет ваш график и попытается оптимизировать вещи, которые могут работать в одном потоке.Это называется оптимизацией fusion и упоминается в статье Flume Java .

. Дело в том, что, как правило, более эффективно запускать эти функции по одной наодин и тот же поток, а не для обмена данными между несколькими потоками обработки или виртуальными машинами для распараллеливания обработки.

Если ваши функции должны выполняться более или менее параллельно, вы можете добавить beam.Reshuffleпреобразование перед преобразованием нисходящего потока:

data = (p 
        | beam.io.ReadFromPubSub(topic)
        | beam.Map(parse_messages))

# After the data has been shuffled, it may be consumed by multiple workers
data | beam.Reshuffle() | beam.ParDo(Fun1())
data | beam.Reshuffle() | beam.ParDo(Fun2())
data | beam.Reshuffle() | beam.ParDo(Fun3())

Дайте мне знать, если я могу добавить некоторые детали в этом.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...