Есть ли простой способ пропустить весь прогон DAG в Airflow, но разрешить его запуск в будущем? - PullRequest
0 голосов
/ 29 февраля 2020

В настоящее время у меня есть DAG, которая запускается каждый день в полдень. Теперь мне бы хотелось, чтобы он запускался каждую пятницу и 10-е число каждого месяца, чтобы сэкономить на некоторых затратах, но все-таки он запланирован на ежедневный запуск, чтобы я мог легко вернуться и запустить даг в любой день. (Причиной этого является то, что иногда мне нужны данные, полученные этим дагом в определенный c день, а не в 10-ю и каждую пятницу, но я не буду знать заранее, когда это потребуется).

Одна мысль, которая у меня была, состояла в том, чтобы использовать BranchPythonOperator, который проверял, были ли выполнены критерии и, если да, запускал бы все задачи, которые необходимо запустить, в противном случае он запускает фиктивную задачу, а все остальное пропускается. Основная проблема в том, что эта группа DAG огромна и содержит много задач, распределенных по разным файлам, поэтому я стараюсь убедиться, что все зависит от этой задачи BranchPythonOperator. (Один вопрос по этому поводу, если я просто сделаю BranchPythonOperator автономным, независимо от того, зависит ли от него что-либо, и будет ли возвращать имена задач, которые все еще работают?)

Я подумал, что должен быть более простой способ просто пропустите весь прогон DAG, если критерии не выполнены, не помешая мне запускать эту дату в будущем.

Отредактировано на примере:

partition_sensor_1 -->
partition_sensor_2 -->
.                         first_dag_task
.
.
partition_sensor_15 -->

Так что в этом примере я бы Я хотел бы убедиться, что все эти датчики разделов и остальная часть DAG пропущены. Нужно ли указывать каждый датчик секций как зависимый от оператора Branch python? При настройке этих датчиков секций они часто создаются в разных файлах, поэтому их нелегко установить в зависимости от оператора ветвления. Надеюсь, что это имеет смысл

1 Ответ

0 голосов
/ 29 февраля 2020

Я думаю, что вы думаете о том, что правильно. Однако вам необходимо подключить BranchPythonOperator к первой задаче вашей группы DAG.

Итак, прямо сейчас у вас есть:

first_task >> of >> your >> old >> dag >> definition

, сначала вы хотите определить свой BranchPythonOperator, а затем изменить тег следующим образом:

branch_task >> [dummy_task, first_task]
first_task >> of >> your >> old >> dag >> definition

убедитесь, что branch_task возвращает идентификатор_задачи first_task, когда выполняются условия, в противном случае - идентификатор задачи фиктивной задачи.

...