У меня есть простой пример, показывающий DAG с двумя уровнями.При запуске этот даг не выполняется, потому что искусственная ошибка, есть одна задача failed
и одна задача upstream_fail
.
bug = True
def process1(param):
print("process 1 running {}".format(param))
if bug and (param == 2):
raise Exception("failure!!")
def process2(param):
print("process 2 running {}".format(param))
with dag:
for i in range(10):
task1 = PythonOperator(
task_id="process_1_{}".format(i),
python_callable=process1,
op_kwargs={'param': i}
)
task2 = PythonOperator(
task_id="process_2_{}".format(i),
python_callable=process2,
op_kwargs={'param': i},
trigger_rule=TriggerRule.ALL_SUCCESS,
retries=2
)
task1 >> task2
Теперь давайте предположим, что я исправил ошибку (bug = False
) и попытался очистить все неудачные задачи:
airflow clear -s 2001 -e 2019 --only_failed test_resubmit
Эта команда очищает задачу test_resubmit.process_1_2
и она будет запущенауспешно, однако его нисходящий поток (т. е. test_resubmit.process_2_2
) все еще находится в состоянии upstream_failed
.Как мне запустить все задачи upstream_failed для «повторной попытки» после того, как состояние их родителей изменилось на успешное?