Как получить последние две даты успешного выполнения задания Airflow? - PullRequest
2 голосов
/ 08 мая 2020

Мне нужно получить последние две даты успешного выполнения задания Airflow для использования в моем текущем запуске. Пример: Дата выполнения Статус задания 2020-05-03 успех 2020-05-04 сбой 2020-05-05 успех

Вопрос: когда я запускаю свою работу 6 мая, я должен получить значения 3 и 5 мая в переменные. Является ли это возможным?

1 Ответ

1 голос
/ 09 мая 2020

Вы можете использовать SQLAlchemy magi c для получения execution_date s против последних n успешных запусков

from pendulum import Pendulum
from typing import List, Dict, Any, Optional
from airflow.utils.state import State
from airflow.settings import Session
from airflow.models.taskinstance import TaskInstance

@provide_session
def last_n_execution_dates(dag_id: str,
                           task_id: str,
                           n: int,
                           session: Optional[Session]) -> List[Pendulum]:
    task_instances: TaskInstance = (session
                                    .query(TaskInstance)
                                    .filter(TaskInstance.dag_id == dag_id,
                                            TaskInstance.task_id == task_id,
                                            TaskInstance.state == State.SUCCESS)
                                    .order_by(TaskInstance.execution_date.desc())
                                    .limit(n)
                                    .all())
    execution_dates: List[Pendulum] = list(map(lambda ti: ti.execution_date, task_instances))
    return execution_dates

Обратите внимание, что фрагмент предназначен только для справочных целей и не протестирован

Я сослался на tree() метод из views.py для создания этого скрипта.


В качестве альтернативы вы можете запустить этот SQL запрос к мета-базе данных Airflow, чтобы получить последние n дат выполнения с успешными запусками

SELECT execution_date
FROM task_instance
WHERE dag_id = 'my_dag_id'
  AND task_id = 'my_task_id'
  AND state = 'success'
ORDER BY execution_date DESC
LIMIT n
...