Вы можете запустить вторую группу обеспечения доступности баз данных «Fail Check», которая запрашивает любые экземпляры задач, где task_id
соответствует желаемому состоянию и состояние failed
с использованием утилиты provide_session
.Затем вы также можете по желанию очистить последующие задачи и установить для состояния DagRun
значение running
.
from datetime import datetime, timedelta
from sqlalchemy import and_
import json
from airflow import DAG
from airflow.models import TaskInstance, DagRun
from airflow.utils.db import provide_session
from airflow.operators.python_operator import PythonOperator
default_args = {'start_date': datetime(2018, 6, 11),
'retries': 2,
'retry_delay': timedelta(minutes=2),
'email': [],
'email_on_failure': True}
dag = DAG('__RESET__FAILED_TASKS',
default_args=default_args,
schedule_interval='@daily',
catchup=False
)
@provide_session
def check_py(session=None, **kwargs):
relevant_task_id = 'relevant_task_id'
obj = (session
.query(TaskInstance)
.filter(and_(TaskInstance.task_id == relevant_task_id,
TaskInstance.state == 'failed'))
.all())
if obj is None:
raise KeyError('No failed Task Instances of {} exist.'.format(relevant_task_id))
else:
# Clear the relevant tasks.
(session
.query(TaskInstance)
.filter(and_(TaskInstance.task_id == relevant_task_id,
TaskInstance.state == 'failed'))
.delete())
# Clear downstream tasks and set relevant DAG state to RUNNING
for _ in obj:
_ = json.loads(_.val)
# OPTIONAL: Clear downstream tasks in the specified Dag Run.
for task in _['downstream_tasks']:
(session
.query(TaskInstance)
.filter(and_(TaskInstance.task_id == task,
TaskInstance.dag_id == _['dag_id'],
TaskInstance.execution_date == datetime.strptime(_['ts'],
"%Y-%m-%dT%H:%M:%S")))
.delete())
# Set the Dag Run state to "running"
dag_run = (session
.query(DagRun)
.filter(and_(DagRun.dag_id == _['dag_id'],
DagRun.execution_date == datetime.strptime(_['ts'],
"%Y-%m-%dT%H:%M:%S")))
.first())
dag_run.set_state('running')
with dag:
run_check = PythonOperator(task_id='run_check',
python_callable=check_py,
provide_context=True)
run_check
.