Воздушный поток создает динамические задачи в одной группе обеспечения доступности баз данных, задача N + 1 зависит от задачи N - PullRequest
0 голосов
/ 28 сентября 2018

При динамическом создании задач мне нужно, чтобы Задача 2 зависела от Задачи 1, Задачи 1 >> Задачи 2 или task2.set_upstream (task1).

Поскольку значения task_ids оцениваются или, по-видимому, являются предварительными, Я не могу установить зависимость заранее, любая помощь приветствуется.

Задачи компонента (I) генерируют нормально, за исключением того, что все они запускаются одновременно.

for i in range(1,10):
  task_id='Component'+str(i)
  task_id = BashOperator(
  task_id='Component'+str(i),
  bash_command="echo  {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
  xcom_push=True,
  dag=dag) 
  ?????.set_upstream(??????)

Ответы [ 2 ]

0 голосов
/ 29 сентября 2018

Используйте следующий код:

a = []
for i in range(0,10):
    a.append(BashOperator(
        task_id='Component'+str(i),
        bash_command="echo  {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
        xcom_push=True,
        dag=dag))
    if i not in [0]: 
        a[i-1] >> a[i]

При использовании DummyOperator коды выглядят следующим образом:

a = []
for i in range(0,10):
    a.append(DummyOperator(
        task_id='Component'+str(i),
        dag=dag))
    if i not in [0]: 
        a[i-1] >> a[i]

Это приведет к созданию следующего DAG:

enter image description here

0 голосов
/ 28 сентября 2018

Вы можете следовать шаблону, подобному следующему:

with dag:

d1 = DummyOperator(task_id='kick_off_dag')

for i in range(0, 5):
    d2 = DummyOperator(task_id='generate_data_{0}'.format(i))
    d1 >> d2

Это сгенерирует 5 задач ниже d1.

...