Как избежать запуска ранее успешных задач в Airflow? - PullRequest
0 голосов
/ 05 июня 2019

У меня есть несколько задач, которые передают некоторые объекты данных друг другу.В некоторых задачах, если какое-то условие не выполняется, я поднимаю исключение.Это приводит к провалу этой задачи.Когда запускается следующий прогон DAG, уже успешно выполненная задача запускается еще раз.Я нахожу способ избежать запуска ранее успешно выполненных задач и возобновить выполнение группы DAG из невыполненной задачи при следующем запуске DAG.

1 Ответ

1 голос
/ 05 июня 2019

Как уже упоминалось, каждый DAG имеет свой набор задач, которые выполняются при каждом запуске.Чтобы избежать запуска ранее успешных задач, вы можете выполнить проверку внешней переменной с помощью XCOM воздушного потока или переменных воздушного потока , вы также можете запросить базу данных метаданных относительно состояния предыдущегопробеги.Вы также можете сохранить переменную в чем-то вроде Redis или аналогичной внешней базе данных.

Используя эту переменную, вы можете пропустить выполнение задачи и напрямую пометить задачу как успешную, пока она не достигнет задачи, которая должна быть выполнена..

Конечно, вы должны помнить о любых возможных условиях гонки, если время выполнения DAG может перекрываться.

def task_1( **kwargs ):
    if external_variable:
        pass
    else:
        perform_task()
    return True
...