Могу ли я программно определить, была ли запланирована группа Airflow DAG или она была запущена вручную? - PullRequest
2 голосов
/ 05 февраля 2020

Я хочу создать фрагмент, который передает правильную дату в зависимости от того, была ли запланирована группа доступности базы данных или она была запущена вручную. DAG работает ежемесячно. Группа обеспечения доступности баз данных создает отчет (запрос A SQL) на основе данных за предыдущий месяц.

Если я запускаю запланированную группу обеспечения доступности баз данных, я могу получить предыдущий месяц с помощью следующего фрагмента кода дзиндзя:

execution_date.month

, учитывая, что группа обеспечения доступности баз данных запланирована на конец предыдущего периода (прошлого месяца), дата выполнения вернёт корректно последний месяц. Однако при ручном запуске это вернет текущий месяц (дата выполнения будет датой ручного триггера).

Я хочу написать простой макрос, который имеет дело с этим случаем. Однако я не смог найти хороший способ программно запросить, запускается ли DAG программно. Лучшее, что я могу придумать, - это извлечь run_id из базы данных (путем создания макроса, который имеет сеанс БД), проверить, содержит ли run_id слово manual. Есть ли лучший способ решить эту проблему?

Ответы [ 2 ]

3 голосов
/ 05 февраля 2020

На данный момент нет прямого свойства DAG для идентификации ручных запусков. Чтобы получить эту информацию, вам нужно будет пометить run_id, как вы упомянули.

Однако есть специальный макрос get run_id. Вам не нужно извлекать его из базы данных самостоятельно. Вот пример того, как его использовать:

    def some_task_py(**context):
        run_id = context['templates_dict']['run_id']
        is_manual = run_id.startswith('manual__')
        is_scheduled = run_id.startswith('scheduled__')


    some_task = PythonOperator(
                task_id = 'some_task',
                dag=dag,
                templates_dict = {'run_id': '{{ run_id }}'},
                python_callable = some_task_py,
                provide_context = True)
2 голосов
/ 02 мая 2020

tl; dr: Вы можете определить это с помощью DagRun.external_trigger.


Я заметил, что в древовидном представлении есть план вокруг прогонов, которые запланированы, но не руководство. Это связано с тем, что к последнему приложено stroke-opacity: 0; в CSS.

При поиске в репозитории я обнаружил, как Разработчики воздушного потока обнаруживают ручные запуски (5-летняя линия, поэтому должен работать в более старая версия):

.style("stroke-opacity", function(d) {return d.external_trigger ? "0": "1"})

Поиск external_trigger приводит нас к DagRun определению .

Так что, если вы используете, например, Python обратный вызов, вы можете иметь что-то вроде этого (может быть определено в DAG или в отдельном файле):

def my_fun(context):
    if context.get('dag_run').external_trigger:
        print('manual run')
    else:
        print('scheduled run')

и в вашем Operator установить параметр как:

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    on_failure_callback=my_fun,
    dag=dag,
)

Я проверил что-то подобное, и оно работает.

Я думаю, что вы также можете сделать что-то вроде, если if {{ dag_run.external_trigger }}: - но я не проверял это, и я считаю, что это будет работать только в этом Файл DAG.

...