Могу ли я узнать, когда группа DAG закончила работу в потоке воздуха? - PullRequest
0 голосов
/ 12 февраля 2020

Я запускаю сценарий, который проверяет состояние моей базы данных перед запуском группы DAG и сравнивает его с после завершения работы группы DAG.

def pre_dag_db
  pass

def run_dag
  pass

def post_dag_db
  pass

Можно ли узнать, когда группа DAG закончил работу, чтобы мой скрипт знал, когда запускать post_dag_db? Идея состоит в том, что мой post_dag_db запускается после того, как мой DAG завершил работу, потому что DAG манипулирует db.

Ответы [ 3 ]

0 голосов
/ 13 февраля 2020

Быстрый и простой способ - добавить одну задачу в DAG, которая будет работать / выполняться как последняя задача DAG, для вас это будет работать как magi c.

вы можете использовать любой оператор лайк (PythonOperator, BashOperator, et c).

0 голосов
/ 13 февраля 2020

Я думаю, вы можете использовать следующий код:

dag = get_dag(args)
dr = DagRun.find(dag.dag_id, execution_date=args.execution_date)
print(dr[0].state if len(dr) > 0 else None)

Этот код взят из airflow cli .

0 голосов
/ 12 февраля 2020

Самый простой способ сделать это - просто запустить скрипт как последнюю задачу в вашем dag, возможно, с помощью BashOperator.

Другими вариантами будет запуск отдельного dag (TriggerDagRunOperator) и реализация dag, который вызывает ваш скрипт.

Если вы действительно не можете вызвать свой скрипт из самого Airflow, вы можете проверить API REST https://airflow.apache.org/docs/stable/api.html и использовать их для получения информации о dag_run. Но мне это кажется слишком сложным.

...