Я хочу выполнить несколько шагов после нескольких начальных шагов.
Например: в моем случае я хочу выполнить начальные 3 шага, затем последние 2 шага.
Как только эти 3 шага завершат выполнение, я хочу начать последние 2 шага.
with beam.Pipeline(options=pipeline_options) as p1:
data_csv = p1 | 'Read CSV file' >> ReadFromText(known_args.input_csv_file)
dict1 = (data_csv | 'Format to json' >> (beam.ParDo(Split())))
(dict1 | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
known_args.output_stage_bq,
schema=product_revenue_schema
))
fullTable = (p1 | 'ReadFromBQ' >> beam.io.Read(beam.io.BigQuerySource(table_spec)))
(fullTable | 'writeToBQ another dataset' >> beam.io.WriteToBigQuery(known_args.output_target_bq,
schema = product_revenue_schema))
Ожидается: 1: Шаг1 -> Шаг2 -> Шаг3 -> Шаг4 -> Шаг5
Факт: 1: Шаг1 -> Шаг2 -> Шаг3
2: Шаг 4 -> Шаг 5