Как найти количество задач, вышедших из строя, не удалось в Airflow? - PullRequest
0 голосов
/ 30 мая 2018

Мне сложно разобраться, как найти невыполненную задачу для того же прогона с запуском, выполняющего дважды в тот же день ( в тот же день выполнения ).

Рассмотрим пример, когда ошибка с dag_id=1 не удалась при первом запуске (возможно, по любой причине, допустим, время ожидания соединения) и задача не выполнена.Таблица TaskInstance будет содержать запись невыполненной задачи, когда мы попытаемся выполнить запрос.БОЛЬШОЙ !!

Но, если я перезапущу тот же dag (обратите внимание, что dag_id по-прежнему равен 1), то в последнем задании (оно имеет правило ALL_DONE, так что независимо от того, была ли выполнена вышестоящая задачаили был успешным, он будет выполнен) Я хочу рассчитать количество задач, которые не были выполнены в текущем dag_run, игнорируя предыдущие dag_runs.Я наткнулся на идентификатор dag_run, который может быть полезен, если мы можем связать его с TaskInstance, но я не смог.Любые предложения / помощь приветствуется.

1 Ответ

0 голосов
/ 31 мая 2018

Вы можете создать задачу PythonOperator, которая запрашивает базу данных Airflow, чтобы найти информацию, которую вы ищете.Это дает дополнительное преимущество, передавая информацию, которая вам необходима для запроса нужных вам данных:

from contextlib import closing
from airflow import models, settings
from airflow.utils.state import State

def your_python_operator_callable(**context):    
  with closing(settings.Session()) as session:
    print("There are {} failed tasks in this execution".format(
      session.query(
        models.TaskInstance
      ).filter(
        models.TaskInstance.dag_id == context["dag"].dag_id, 
        models.TaskInstance.execution_date == context["execution_date"],
        models.TaskInstance.state == State.FAILED).count()
      )

Затем добавьте задачу к вашему DAG с помощью PythonOperator.

( Я не проверял вышеизложенное, но, надеюсь, отправит вас по правильному пути )

...