Как получить время выполнения цепочки DAG в Airflow? - PullRequest
0 голосов
/ 13 сентября 2018

Допустим, у меня есть два DAG, где dag2 выполнил dag1 как часть своего потока, используя TriggerDagRunOperator, следующим образом:

  • dag1: задача1> задача2> задача3
  • dag2: задача4> dag1> задача5

Теперь предположим, что dag2 запланирован на один день в 17:00. Есть ли способ получить метку времени выполнения для dag2 (родительской группы DAG), пока я запускаю dag1? Есть ли встроенный параметр, который содержит это значение?

И если что-то случилось, и dag2 был запущен позже, чем обычно, скажем, в 6 вечера в тот же день, тогда я все еще хочу получить исходное время планирования - это 5 вечера, пока я в dag1.

1 Ответ

0 голосов
/ 13 сентября 2018

Передайте функцию аргументу python_callable в TriggerDagRunOperator, который вводит execution_date в триггер DAG:

def inject_execution_date(context, dag_run_obj):
  dag_run_obj.payload = {"parent_execution_date": context["execution_date"]}
  return dag_run_obj

[...]

trigger_dro = TriggerDagRunOperator(python_callable=inject_execution_date, [...])

Вы можете получить доступ к этому в дочерней DAG с помощью context["conf"]["parent_execution_date"]

...