Выполнение определенных шагов только после выполнения предыдущего шага в том же конвейере в потоке данных Apache - PullRequest
0 голосов
/ 03 мая 2019

Я хочу выполнить несколько шагов после нескольких начальных шагов. Например: в моем случае я хочу выполнить начальные 3 шага, затем последние 2 шага.

Как только эти 3 шага завершат выполнение, я хочу начать последние 2 шага.

with beam.Pipeline(options=pipeline_options) as p1:
    data_csv = p1 | 'Read CSV file' >> ReadFromText(known_args.input_csv_file)
    dict1 = (data_csv | 'Format to json' >> (beam.ParDo(Split())))
    (dict1 | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                                        known_args.output_stage_bq,
                                        schema=product_revenue_schema
                                        ))
    fullTable = (p1 | 'ReadFromBQ' >> beam.io.Read(beam.io.BigQuerySource(table_spec)))
    (fullTable | 'writeToBQ another dataset' >> beam.io.WriteToBigQuery(known_args.output_target_bq,
                            schema = product_revenue_schema))

Ожидается: 1: Шаг1 -> Шаг2 -> Шаг3 -> Шаг4 -> Шаг5

Факт: 1: Шаг1 -> Шаг2 -> Шаг3 2: Шаг 4 -> Шаг 5

1 Ответ

0 голосов
/ 03 мая 2019

В Beam Java SDK преобразование Wait - это то, что вам нужно.

В Beam Python SDK такого преобразования в данный момент нет. Вам следует использовать два отдельных конвейера и синхронизировать их вручную (например, дождаться окончания первого конвейера перед запуском второго конвейера или отправить сообщение pubsub из первого конвейера, чтобы сообщить второму конвейеру о том, что запись выполнена).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...