Есть ли способ настроить различные «повторные попытки» для задач в том же DAG - PullRequest
0 голосов
/ 02 мая 2019

У меня есть группа доступности базы данных со многими подзадачами.В середине группы обеспечения доступности баз данных находится задача проверки, и на основе кода результата / возврата из задачи я хочу выбрать два разных пути.В случае успеха будет следовать один маршрут (последовательность задач), а в случае неудачи мы хотели бы выполнить другой набор задач.С текущим подходом есть две проблемы, одна из которых заключается в том, что задачи проверки выполняются многократно (в соответствии с настроенными повторными попытками), если код выхода равен 1. Во-вторых, невозможно получить разные ветви выполнения

Чтобы решить проблему № 1, мы можем использовать номер повторения, доступный из экземпляра задачи, который доступен через макрос {{task_instance}}.Цените, если кто-то может указать нам на более чистый подход, и проблема № 2 в выборе различных путей остается нерешенной.

Ответы [ 2 ]

1 голос
/ 02 мая 2019

Вы можете иметь retries на уровне задач.

run_this = BashOperator(
    task_id='run_after_loop',
    bash_command='echo 1',
    retries=3,
    dag=dag,
)

run_this_last = DummyOperator(
    task_id='run_this_last',
    retries=1,
    dag=dag,
)

Что касается вашей второй проблемы, существует концепция Ветвление .

enter image description here

BranchPythonOperator очень похож на PythonOperator за исключением того, что он ожидает python_callable, который возвращает task_id (или списокtask_ids).За возвращаемым идентификатором task_id следуют, а все остальные пути пропускаются.Идентификатор task_id, возвращаемый функцией Python, должен ссылаться на задачу непосредственно после задачи BranchPythonOperator.

Пример DAG:

import random

import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

dag = DAG(
    dag_id='example_branch_operator',
    default_args=args,
    schedule_interval="@daily",
)

run_this_first = DummyOperator(
    task_id='run_this_first',
    dag=dag,
)

options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']

branching = BranchPythonOperator(
    task_id='branching',
    python_callable=lambda: random.choice(options),
    dag=dag,
)
run_this_first >> branching

join = DummyOperator(
    task_id='join',
    trigger_rule='one_success',
    dag=dag,
)

for option in options:
    t = DummyOperator(
        task_id=option,
        dag=dag,
    )

    dummy_follow = DummyOperator(
        task_id='follow_' + option,
        dag=dag,
    )

    branching >> t >> dummy_follow >> join
0 голосов
/ 02 мая 2019

Что касается вашей первой проблемы, вы можете легко установить параметры повторения для задачи / оператора. Ссылка: baseoperator.py # L77 .

Проблема вторая: вы можете легко переходить в DAG с помощью BranchPythonOperator (Пример использования: example_branch_operator.py ). Вы захотите вложить свою задачу / логику проверки в BranchPythonOperator (Вы можете определять и выполнять операторы внутри операторов).

...