Как перезапустить дага, если он не работает на воздушном потоке 1.8? - PullRequest
0 голосов
/ 11 июня 2018

С помощью:

default_args = {
    ...
    'retries': 1,
    'retry_delay': timedelta (seconds = 1),
    ...
} 

Я могу получить задачу, которую не удается повторить несколько раз, но как я могу получить ее, если задача не выполняется, DAG запускается снова?

Конечно, автоматически ...

Ответы [ 2 ]

0 голосов
/ 12 июня 2018

Вы можете запустить вторую группу обеспечения доступности баз данных «Fail Check», которая запрашивает любые экземпляры задач, где task_id соответствует желаемому состоянию и состояние failed с использованием утилиты provide_session.Затем вы также можете по желанию очистить последующие задачи и установить для состояния DagRun значение running.

from datetime import datetime, timedelta
from sqlalchemy import and_
import json

from airflow import DAG
from airflow.models import TaskInstance, DagRun
from airflow.utils.db import provide_session

from airflow.operators.python_operator import PythonOperator


default_args = {'start_date': datetime(2018, 6, 11),
                'retries': 2,
                'retry_delay': timedelta(minutes=2),
                'email': [],
                'email_on_failure': True}


dag = DAG('__RESET__FAILED_TASKS',
          default_args=default_args,
          schedule_interval='@daily',
          catchup=False
          )


@provide_session
def check_py(session=None, **kwargs):

    relevant_task_id = 'relevant_task_id'

    obj = (session
           .query(TaskInstance)
           .filter(and_(TaskInstance.task_id == relevant_task_id,
                        TaskInstance.state == 'failed'))
           .all())

    if obj is None:
        raise KeyError('No failed Task Instances of {} exist.'.format(relevant_task_id))
    else:
        # Clear the relevant tasks.
        (session
         .query(TaskInstance)
         .filter(and_(TaskInstance.task_id == relevant_task_id,
                      TaskInstance.state == 'failed'))
         .delete())

        # Clear downstream tasks and set relevant DAG state to RUNNING
        for _ in obj:
            _ = json.loads(_.val)

            # OPTIONAL: Clear downstream tasks in the specified Dag Run.
            for task in _['downstream_tasks']:
                (session
                 .query(TaskInstance)
                 .filter(and_(TaskInstance.task_id == task,
                              TaskInstance.dag_id == _['dag_id'],
                              TaskInstance.execution_date == datetime.strptime(_['ts'],
                                                                                "%Y-%m-%dT%H:%M:%S")))
                 .delete())

            # Set the Dag Run state to "running"
            dag_run = (session
                       .query(DagRun)
                       .filter(and_(DagRun.dag_id == _['dag_id'],
                                    DagRun.execution_date == datetime.strptime(_['ts'],
                                                                               "%Y-%m-%dT%H:%M:%S")))
                       .first())

            dag_run.set_state('running')

with dag:

    run_check = PythonOperator(task_id='run_check',
                               python_callable=check_py,
                               provide_context=True)

    run_check
.
0 голосов
/ 11 июня 2018

Вы могли бы потенциально использовать функцию on_failure_callback для вызова сценария python / bash, который перезапустил бы DAG.В настоящее время Airflow не предоставляет функцию автоматического перезапуска группы обеспечения доступности баз данных при сбое задачи.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...