Express зависимость задачи воздушного потока от задачи, которая не является прямым родителем - PullRequest
1 голос
/ 10 февраля 2020

У меня довольно простой c рабочий процесс воздушного потока и одно маленькое препятствие, которое я не могу преодолеть. Поэтому моей целью было бы иметь две группы задач. Сначала следует запустить первую группу, затем следующую. Проблема заключается в том, что в группе 2 мне нужно создать зависимости для задач из группы 1. Если определенная задача в группе 1 не выполнена, мы можем пропустить ее зависимость из группы 2.

Итак, это макет Я легко могу достичь: enter image description here

dag = DAG(
    'Example',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)


def get_task(name):
    return BashOperator(
        task_id=name,
        bash_command='date',
        dag=dag,
        trigger_rule=TriggerRule.ALL_DONE
    )


t = [get_task(f'Task_{n+1}') for n in range(3)]
d = [get_task(f'Should_depend_on_task_{n+1}') for n in range(3)]

start = get_task('start')
intermediate = get_task('wait_for_1_2_3')
intermediate2 = get_task('wait_for_4_5_6')
end = get_task('end')

start >> t >> intermediate >> d >> intermediate2 >> end

Но мне действительно нужно что-то подобное (розовый означает пропуск): enter image description here

Так что я ищу способ динамически проектировать такие зависимости. Или я думаю, что мне действительно нужно было бы динамически сказать, что я хочу пропустить задачу. Любая помощь высоко ценится. Я думал об использовании XCOM, но тогда я понятия не имею, как пропустить задачу во время выполнения. Можно было бы использовать PythonBranchOperator, но мне нужно было бы добавить его к каждой задаче, которая кажется немного сложной.

1 Ответ

0 голосов
/ 09 марта 2020

Итак, я разработал этот миксин, который позволяет установить состояние задачи, которое будет пропущено, если ни одна из задач, от которых она зависит, не будет выполнена успешно:

from airflow.exceptions import AirflowSkipException
from airflow.models import TaskInstance
from airflow.utils.db import provide_session
from airflow.utils.state import State


class SkippableOperatorMixin:
    """
    This mixin provides extends an Airflow operators functionality with
    the ability to skip its execution if any of the tasks it depends on did not succeed.
    """
    depends_on_tasks = []

    def __init__(self, depends_on_tasks=[], **kwargs):
        self.depends_on_tasks = depends_on_tasks
        super().__init__(**kwargs)

    def _get_subdag_id_and_task_id(self, task_id):
        """
        Gets the subdag_id and task_id from the task_id

        Example:
          Task1 => (None, 'Task1')
          MainDag.SubDag1.Task1 => ('MainDag.SubDag1', 'Task1')
        """
        task_arr = task_id.split('.')
        return '.'.join(task_arr[:-1]), task_arr[-1]

    @provide_session
    def current_state(self, task_id, execution_date, session=None):
        """
        Get the very latest state of a task identified by the task_id and execution_date from the database.
        """
        TI = TaskInstance
        dag_id, task_id = self._get_subdag_id_and_task_id(task_id)

        qry = session.query(TI).filter(
            TI.task_id == task_id,
            TI.execution_date == execution_date,
        )

        if dag_id:
            qry = qry.filter(TI.dag_id == dag_id)

        ti = qry.all()

        if ti:
            state = ti[0].state
        else:
            state = None
        return state

    def pre_execute(self, context):
        execution_date = context['execution_date']
        skip_task = False
        prerequisite_states = [self.current_state(task_id, execution_date) for task_id in self.depends_on_tasks]
        for state in prerequisite_states:
            if state != State.SUCCESS:
                skip_task = True

        if skip_task:
            raise AirflowSkipException

Пример использования:

class MySkippableOperator(SkippableOperatorMixin, MyOperator):
    pass


task = MySkippableOperator(
    # All the properties you would pass to YourOperator
    depends_on_tasks = ['TaskId1', 'TaskId2']
)
...