Воздушный поток "none_failed" пропускается при пропуске вверх по течению - PullRequest
2 голосов
/ 09 октября 2019

У меня есть рабочий процесс, в котором у меня есть два параллельных процесса (sentinel_run и sentinel_skip), которые должны выполняться или пропускаться в зависимости от условия, а затем объединяться (resolve). Мне нужно, чтобы задачи, расположенные непосредственно после любой задачи sentinel_, каскадно пропускались, но когда она попадает к задаче resolve, resolve должна запускаться, если в каком-либо из процессов выше не происходит сбоев.

На основе документация , должно работать правило триггера none_failed:

none_failed: все родители не потерпели неудачу (отказ или upstream_failed), т.е. все родители преуспели или были пропущены

и это также ответ на связанный вопрос .

Однако, когда я реализовал тривиальный пример, это не то, что я вижу:

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.utils.dates import days_ago

dag = DAG(
    "testing",
    catchup=False,
    schedule_interval="30 12 * * *",
    default_args={
        "owner": "test@gmail.com",
        "start_date": days_ago(1),
        "catchup": False,
        "retries": 0
    }
)

start = DummyOperator(task_id="start", dag=dag)

sentinel_run = ShortCircuitOperator(task_id="sentinel_run", dag=dag, python_callable=lambda: True)
sentinel_skip = ShortCircuitOperator(task_id="sentinel_skip", dag=dag, python_callable=lambda: False)

a = DummyOperator(task_id="a", dag=dag)
b = DummyOperator(task_id="b", dag=dag)
c = DummyOperator(task_id="c", dag=dag)
d = DummyOperator(task_id="d", dag=dag)
e = DummyOperator(task_id="e", dag=dag)
f = DummyOperator(task_id="f", dag=dag)
g = DummyOperator(task_id="g", dag=dag)

resolve = DummyOperator(task_id="resolve", dag=dag, trigger_rule="none_failed")

start >> sentinel_run >> a >> b >> c >> resolve
start >> sentinel_skip >> d >> e >> f >> resolve

resolve >> g

Этот код создает следующий знак:

DAG Design

Проблема заключается в том, что задача resolved должна выполняться (потому что ничто в восходящем направлении не является либо upstream_failed или failed), но вместо этого он пропускает.

Я изучил базу данных, и нет скрытых сбойных или вышестоящих сбойных задач, и я не могу понять, почему это не будетсоблюдать логику «none_failed».

Я знаю о "некрасивом обходном пути" и реализовал его в других рабочих процессах, но он добавляет еще одну задачу для выполнения и увеличивает сложность, с которой приходится сталкиваться новым пользователям в группе обеспечения доступности баз данных (особенно при умноженииэто несколькими задачами ...). Это было моей основной причиной для обновления с Airflow 1.8 до Airflow 1.10, поэтому я надеюсь, что мне просто не хватает чего-то очевидного ...

1 Ответ

2 голосов
/ 09 октября 2019

Документирование этого, потому что эта проблема укусила меня дважды, и теперь я решил ее дважды.

Анализ проблемы

Когда вы переключаете уровень журнала на DEBUG, вы начинаете видеть, что происходитon:

[2019-10-09 18:30:05,472] {python_operator.py:114} INFO - Done. Returned value was: False
[2019-10-09 18:30:05,472] {python_operator.py:159} INFO - Condition result is False
[2019-10-09 18:30:05,472] {python_operator.py:165} INFO - Skipping downstream tasks...
[2019-10-09 18:30:05,472] {python_operator.py:168} DEBUG - Downstream task_ids [<Task(DummyOperator): f>, <Task(DummyOperator): g>, <Task(DummyOperator): d>, <Task(DummyOperator): resolve>, <Task(DummyOperator): e>]
[2019-10-09 18:30:05,492] {python_operator.py:173} INFO - Done.

Отсюда видно, что проблема не в том, что «none_failed» неправильно обрабатывает задачи, а в том, что сторож, имитирующий условие пропуска, помечает все нисходящие зависимости пропущены напрямую. Это поведение ShortCircuitOperator - пропускаются все нисходящие потоки, включая задачи нисходящего из нисходящих задач.

Решение

Решение этогопроблема заключается в признании того, что именно поведение ShortCircuitOperator, а не TriggerRule, является причиной проблемы. Как только мы это осознаем, пришло время приступить к написанию оператора, более подходящего для задачи, которую мы на самом деле пытаемся выполнить.

Я включил оператор, которым я сейчас пользуюсь;Я приветствовал бы любой вклад в лучший способ справиться с модификацией отдельных последующих задач. Я уверен, что есть лучшая идиома для «пропустить только следующий и позволить остальным каскадировать в соответствии с их правилами триггера», но я уже потратил больше времени, чем хотел, и я подозреваю, что ответ лежит еще глубже вВнутренние органы.

"""Sentinel Operator Plugin"""

import datetime

from airflow import settings
from airflow.models import SkipMixin, TaskInstance
from airflow.operators.python_operator import PythonOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.state import State


class SentinelOperator(PythonOperator, SkipMixin):
    """
    Allows a workflow to continue only if a condition is met. Otherwise, the
    workflow skips cascading downstream to the next time a viable task
    is identified.

    The SentinelOperator is derived from the PythonOperator. It evaluates a
    condition and stops the workflow if the condition is False. Immediate
    downstream tasks are skipped. If the condition is True, downstream tasks
    proceed as normal.

    The condition is determined by the result of `python_callable`.
    """
    def execute(self, context):
        condition = super(SentinelOperator, self).execute(context)
        self.log.info("Condition result is %s", condition)

        if condition:
            self.log.info('Proceeding with downstream tasks...')
            return

        self.log.info('Skipping downstream tasks...')

        session = settings.Session()

        for task in context['task'].downstream_list:
            ti = TaskInstance(task, execution_date=context['ti'].execution_date)
            self.log.info('Skipping task: %s', ti.task_id)
            ti.state = State.SKIPPED
            ti.start_date = datetime.datetime.now()
            ti.end_date = datetime.datetime.now()
            session.merge(ti)

        session.commit()
        session.close()

        self.log.info("Done.")


class Plugin_SentinelOperator(AirflowPlugin):
    name = "sentinel_operator"
    operators = [SentinelOperator]

С изменениями это дает ожидаемые результаты:

Correct Dag

...