Как установить последующую задачу после завершения 2 задач - PullRequest
0 голосов
/ 11 апреля 2019

Я выполняю задачи (T1 и T2) параллельно. Как установить последующую задачу T3, которая зависит от успешного завершения как T1, так и T2

[T1,T2].set_downstream(T3)

Но, получая следующую ошибку AttributeError: у объекта 'list' нет атрибута 'set_downstream'

Ответы [ 2 ]

0 голосов
/ 12 апреля 2019

Как настроить зависимость восходящего и нисходящего потоков:

Давайте рассмотрим этот пример: он содержит 3 задачи, поэтому вы можете установить Восходящая-нисходящая зависимость, подобная этой:

[t1, t2] >> t3

t1 = PythonOperator(
    task_id='t1',
    dag=dag,
    python_callable=call_me,
)

t2 = PythonOperator(
    task_id='push_by_returning',
    dag=dag,
    python_callable=call_me_too(),
)

t3 = PythonOperator(
    task_id='puller',
    dag=dag,
    python_callable=status_chcker,
    trigger_rule="all_done"
)



[t1, t2] >> t3  

Как установить условие для запуска задания ниже по потоку:

trigger_rule = "all_done"

Существует несколько правил триггера, в соответствии с которыми могут запускаться последующие задания.

  • all_success: (по умолчанию) все родители преуспели

  • all_failed: все родители находятся в состоянии отказа или в состоянии upstream_failed

  • all_done: все родители закончили казнь

  • one_failed: срабатывает, как только один из родителей вышел из строя, не ждать, пока все родители закончатся

  • one_success: срабатывает, как только хотя бы одному из родителей удается, не ждать, пока все родители закончатся

  • none_failed: все родители не потерпели неудачу (ошибка или upstream_failed) все родители преуспели или были пропущены

  • none_skipped: ни один из родителей не пропущен, т. Е. Все родители в состоянии успеха, сбоя или состояния upstream_failed

  • Пустышка: зависимости только для показа, триггер по желанию

0 голосов
/ 11 апреля 2019

Существует несколько способов реализации зависимостей задач, Чтение - Управление зависимостями или просто посмотрите примеры ниже.

T1.set_downstream(T3)
T2.set_downstream(T3)

или

T3.set_upstream([T1, T2])

или

[T1, T2] >> T3

...