Как вызвать задачу в воздушном потоке, если предварительная задача не выполнена и зависит от_поня = истина? - PullRequest
0 голосов
/ 11 июля 2019

У меня задание нужно запускать каждые 5 минут.И следующая задача должна ждать выполнения предыдущей задачи (независимо от успеха или неудачи предыдущей задачи).

Вот как я настраиваю параметры.

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2019, 7, 1, 8, 30),
    'max_active_runs': 1,
    'depends_on_past': True,
    'execution_timeout': timedelta(seconds=300)
}

dag = DAG(
    dag_id='dag1', default_args=default_args,
    schedule_interval='*/5 8-16 * * *',
    dagrun_timeout=timedelta(minutes=600))

def task1(ds, **kwargs):
    #do something

task1 = PythonOperator(
    task_id='task1',
    provide_context=True,
    python_callable=task1,
    trigger_rule= 'all_done',
    dag=dag)

Под моимКонфигурация, задача1 будет запущена, когда предыдущее состояние успешно, и будет заблокирована, когда предыдущее состояние не удалось.Я нашел описание в документе airflow: «trigger_rule можно использовать в сочетании с depen_on_past (логическим), который при значении True препятствует запуску задачи, если предыдущее расписание для задачи не выполнено».

Итак, как мне достичь цели запуска задачи 1, пока предыдущая задача1 выполнена, и неважно, каково предварительное состояние?

1 Ответ

1 голос
/ 13 июля 2019

max_active_runs: 1 разрешит только одно выполнение dag одновременно, покрывая ваше ожидание предыдущего требования.

У вас есть 2 параметра зависящих_последних. Удалите один и установите для него значение false.

...