Может сбой повторной попытки задачи DAG Airflow с измененным параметром - PullRequest
0 голосов
/ 16 января 2019

Можно ли с помощью Airflow перезапустить задачу вверх по потоку, если задача внизу не выполняется? Это, кажется, противоречит «ациклической» части термина DAG. Я думаю, что это общая проблема.

Фон

Я изучаю использование Airflow для управления рабочим процессом обработки данных, который управлялся вручную.

Существует задача, которая не будет выполнена, если параметр x установлен слишком высоко, но увеличение значения параметра дает более качественные результаты. Мы не нашли способ рассчитать безопасный, но максимально высокий параметр x. Ручной процесс состоял в том, чтобы перезапустить задание в случае сбоя с более низким параметром, пока оно не заработало.

Рабочий процесс выглядит примерно так:

Задача A - Сбор необработанных данных

Задача B - Создать файл конфигурации для задания

Задача C - Изменить параметр файла конфигурации x

Задача D - Запуск задания по обработке данных

Задача E - Обработка результатов работы

Задача F - Создание отчетов

Выпуск

Если задача D не выполнена из-за слишком высокого параметра x, я хочу повторно запустить задачу C и задачу D. Похоже, это не поддерживается. Я был бы очень признателен за советы о том, как справиться с этим.

1 Ответ

0 голосов
/ 17 января 2019

Прежде всего: это отличный вопрос, интересно, почему он до сих пор широко не обсуждался


Я могу придумать два возможных подхода

  1. Фьюзинг Operators: Как указано @ Kris , Объединение Operators вместе представляется наиболее очевидный обходной путь

  2. Отдельно Верхний уровень DAG с : см. Ниже


Подход отдельных DAG верхнего уровня

Учитывая

  • Скажем, у вас есть задачи A & B
  • A вверх по течению до B
  • Вы хотите возобновить выполнение (повторить попытку) с A, если B не удается

(Возможно) Идея: Если вы чувствуете себя авантюрным

  • Поместите задачи A & B в отдельные верхний уровень DAG s, скажем, DAG-A и DAG-B
  • В конце DAG-A активируйте DAG-B, используя TriggerDagRunOperator
    • По всей вероятности, вам также придется использовать ExternalTaskSensor после TriggerDagRunOperator
  • В DAG-B поставьте BranchPythonOperator после Task-B с trigger_rule=all_done
  • Этот BranchPythonOperator должен перейти на другой TriggerDagRunOperator, который затем вызывает DAG-A (снова!)

Полезные ссылки


EDIT-1

Вот гораздо более простой способ достижения подобного поведения.

Как перезапустить задачу вверх по течению, если задача потока ниже не выполняется в Airflow (с использованием вспомогательных пакетов)

...