после сбоя задачи поток воздуха перестает планировать дагруны - PullRequest
0 голосов
/ 30 августа 2018

Меня смущает то, что делает воздушный поток, если дагрун выходит из строя. Поведение, которого я хочу достичь:

  1. Регулярные триггеры DAG (ежечасно)
  2. Повтор для задачи
  3. Если задание не выполнено n попыток, отправьте электронное письмо об ошибке
  4. Когда сработает следующий почасовой триггер, активируйте новый дагрун, как будто ничего не вышло.

Это мои аргументы dag и аргументы задачи:

задание по умолчанию:

'depends_on_past': True,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['email@address.co.uk'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'wait_for_downstream': False,

аргументы Дага:

schedule_interval=timedelta(minutes=60),
catchup=False,
max_active_runs=1

Я думаю, что я неправильно понимаю некоторые из этих аргументов, потому что мне кажется, что если задача терпит неудачу n раз (т. Е. Происходит сбой dagrun), то следующий dagrun назначается по расписанию, а просто остается в работающем состоянии навсегда, и больше никаких dagrun'ов не бывает преуспеть (или даже получить по расписанию). например, вот дагруны (я не знал, где найти текстовые журналы планировщика, как в этот вопрос ), где даги должны запускаться каждые 5 минут, а не каждый час:

enter image description here

Выполнение выполняется каждые 5 минут до сбоя, после чего последнее выполнение находится только в состоянии выполнения и было так в течение последних 30 минут.

Что я сделал не так?

Я должен добавить, что перезапуск планировщика не помогает, равно как и ручная установка этой выполняемой задачи на неудачу ...

Ответы [ 2 ]

0 голосов
/ 07 августа 2019

Этот вопрос вызвал у меня сильную головную боль, поэтому я хочу опубликовать полное решение.

В моем случае выполнение следующей группы доступности базы данных не начиналось, когда предыдущее выполнение не удавалось, даже если у меня была опция disabled_on_past = False. Это было связано с тем, что опция wait_for_downstream была True, и эта комбинация несовместима. Согласно документации:

wait_for_downstream (bool) - если задано значение true, экземпляр задачи X будет> ждать, пока задачи, расположенные сразу после предыдущего экземпляра задачи X, не завершатся успешно, прежде чем он запустится. Это полезно, если разные экземпляры задачи> X изменяют один и тот же актив, и этот актив используется задачами, расположенными ниже по потоку от задачи X.> Обратите внимание, что depen_on_past принудительно устанавливается в True везде, где используется wait_for_downstream.

Наконец, обратите внимание, что важно, чтобы опция max_active_runs = 1 активируется, потому что в другом случае одна и та же задача может запускаться одновременно при нескольких последующих запусках.

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'wait_for_downstream': False,
    'start_date': datetime(2019, 7, 20),
}

dag = DAG(
    dag_id='test_v8',
    default_args=args,
    schedule_interval='* * * * *',
    catchup=False,
    max_active_runs=1

)

from time import sleep


def sleep_1():
    sleep(1)


def sleep_2():
    sleep(2)


sleep_2 = PythonOperator(
    task_id='sleep_2',
    python_callable=sleep_2,
    dag=dag,
)

sleep_1 = PythonOperator(
    task_id='sleep_1',
    python_callable=sleep_1,
    dag=dag,
)

sleep_1 >> sleep_2

Наконец то сработало!

enter image description here

0 голосов
/ 30 августа 2018

У вас depends_on_past установлено значение True, что препятствует запуску следующего DagRun.

Из документов : depen_on_past (bool) - если задано значение true, экземпляры задач будут выполняться последовательно, полагаясь на расписание предыдущей задачи. Экземпляр задачи для start_date разрешен к запуску.

Это означает, что ваш Dag пытается запустить, но ждет, пока соответствующее задание из предыдущего DagRun не получит состояние успеха.

...