Как я могу запустить все задачи upstream_failed для «повторной попытки» после того, как состояние их родителей изменилось на успех в Airflow? - PullRequest
0 голосов
/ 27 декабря 2018

У меня есть простой пример, показывающий DAG с двумя уровнями.При запуске этот даг не выполняется, потому что искусственная ошибка, есть одна задача failed и одна задача upstream_fail.

bug = True 

def process1(param):
    print("process 1 running {}".format(param))
    if bug and (param == 2):
        raise Exception("failure!!")


def process2(param):
    print("process 2 running {}".format(param))


with dag:
    for i in range(10):
        task1 = PythonOperator(
            task_id="process_1_{}".format(i),
            python_callable=process1,
            op_kwargs={'param': i}
        )
        task2 = PythonOperator(
            task_id="process_2_{}".format(i),
            python_callable=process2,
            op_kwargs={'param': i},
            trigger_rule=TriggerRule.ALL_SUCCESS,
            retries=2
        )
        task1 >> task2

Теперь давайте предположим, что я исправил ошибку (bug = False) и попытался очистить все неудачные задачи:

airflow clear -s 2001 -e 2019 --only_failed test_resubmit

Эта команда очищает задачу test_resubmit.process_1_2 и она будет запущенауспешно, однако его нисходящий поток (т. е. test_resubmit.process_2_2) все еще находится в состоянии upstream_failed.Как мне запустить все задачи upstream_failed для «повторной попытки» после того, как состояние их родителей изменилось на успешное?

1 Ответ

0 голосов
/ 27 декабря 2018

Состояние upstream_failed является конечным состоянием, поэтому оно не будет повторяться, даже если его зависимости теперь выполнены (в отличие от up_for_retry).Вы захотите передать --downstream, чтобы задачи после невыполненных задач также очищались.

Просмотреть все опции в https://airflow.readthedocs.io/en/stable/cli.html#clear.

...