Указанное поведение может быть достигнуто путем введения задачи, которая вызывает задержку указанной продолжительности между вашими Task 1
и Task 2
Этого можно добиться, используя PythonOperator
import time
from airflow.operators.python_operator import PythonOperator
delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
dag=my_dag,
python_callable=lambda: time.sleep(300))
task_1 >> delay_python_task >> task_2
Или используя BashOperator
, а также
from airflow.operators.bash_operator import BashOperator
delay_bash_task: BashOperator = BashOperator(task_id="delay_bash_task",
dag=my_dag,
bash_command="sleep 5m")
task_1 >> delay_bash_task >> task_2
Примечание: данные фрагменты кода НЕ проверены
Ссылки
UPDATE-1
Вот некоторые другие способы введения задержки
on_success_callback
/ on_failure_callback
: В зависимости от того, должен ли Task 2
работать в случае успеха или неудачи Task 1
, вы можете передать lambda: time.sleep(300)
в любом из этих параметров Task 1
pre_execute()
/ post_execute()
: вызов time.sleep(300)
в Task 1
'post_execute()
или Task 2
* pre_execute()
также будет иметь то же значение эффект. Конечно, это потребует изменения кода для вашего tasks
(1 или 2), поэтому лучше избегайте его
Лично я предпочел бы extra task
подход , потому что он делает вещи более явными и не ложно преувеличивает время выполнения вашего Task 1
или Task 2