Вы можете использовать SQLAlchemy
magi c для получения execution_date
s против последних n успешных запусков
from pendulum import Pendulum
from typing import List, Dict, Any, Optional
from airflow.utils.state import State
from airflow.settings import Session
from airflow.models.taskinstance import TaskInstance
@provide_session
def last_n_execution_dates(dag_id: str,
task_id: str,
n: int,
session: Optional[Session]) -> List[Pendulum]:
task_instances: TaskInstance = (session
.query(TaskInstance)
.filter(TaskInstance.dag_id == dag_id,
TaskInstance.task_id == task_id,
TaskInstance.state == State.SUCCESS)
.order_by(TaskInstance.execution_date.desc())
.limit(n)
.all())
execution_dates: List[Pendulum] = list(map(lambda ti: ti.execution_date, task_instances))
return execution_dates
Обратите внимание, что фрагмент предназначен только для справочных целей и не протестирован
Я сослался на tree()
метод из views.py
для создания этого скрипта.
В качестве альтернативы вы можете запустить этот SQL запрос к мета-базе данных Airflow, чтобы получить последние n дат выполнения с успешными запусками
SELECT execution_date
FROM task_instance
WHERE dag_id = 'my_dag_id'
AND task_id = 'my_task_id'
AND state = 'success'
ORDER BY execution_date DESC
LIMIT n