Как не отображать задачу в DAG на основе условия - PullRequest
1 голос
/ 29 сентября 2019

У меня есть DAG и у меня есть 3 задачи.Я не хотел бы отображать 2-е задание (middle_name) в прогоне DAG в зависимости от условия.например, если middle_name_var == 'false', я не хочу отображать задачу middle_name в DAG.Есть ли способ элегантно добиться этого?

from airflow.operators import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.models import Variable


middle_name_var = Variable.get('middle_name')
default_args = {
    'owner': 'test',
    'depends_on_past': False,
    'start_date': datetime(2018, 6, 18),
    'email': ['tes@abc.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}
dag = DAG(
    'name',
    default_args=default_args,
    schedule_interval="@once")


def first_name():
    print('John')

def middle_name():
    print('Smith')


def last_name():
    print('Doe')


first_name_task = PythonOperator(
    task_id='first_name',
    provide_context=False,
    python_callable=first_name,
    dag=dag
)

middle_name_task = PythonOperator(
    task_id='middle_name',
    provide_context=False,
    python_callable=middle_name,
    dag=dag
)

last_name_task = PythonOperator(
    task_id='last_name',
    provide_context=False,
    python_callable=last_name,
    dag=dag
)

if middle_name_var == 'true':
    first_name_task >> middle_name_task >>last_name_task
else:
    first_name_task >> last_name_task

Мой DAG выглядит так с задачей middle_name ... Но я бы не хотел, чтобы задача middle_name основывалась на middle_name_var, для которого в этом случае установлено значение false..

enter image description here

1 Ответ

1 голос
/ 29 сентября 2019

Что касается последнего набора цепочек операторов

# by the way i believe the comparison expression should be
# middle_name_var == True (boolean rather than string), but lets ignore it for now
if middle_name_var == 'true':
    first_name_task >> middle_name_task >>last_name_task
else:
    first_name_task >> last_name_task

Позвольте мне спросить вас: что произойдет, если вы удалите эти цепочки операторов?Не исчезнут ли задачи из DAG?

Не совсем.


Цепочка просто устанавливает отношения зависимости между задачами.Даже без цепочки ваша задача останется частью вашей DAG (как на скриншоте, который вы опубликовали).

Вот секрет: задача становится частью вашего DAG, как только вы объявляете ее

middle_name_task = PythonOperator(
    task_id='middle_name',
    provide_context=False,
    python_callable=middle_name,
    dag=dag
)

И независимо от того, устанавливаете ли вы эту задачу выше или ниже некоторых других задач, она будет по-прежнему «появляться» в вашей группе обеспечения доступности баз данных.В связи с этим docs

Операторы не должны быть назначены на группы DAG немедленно (ранее dag был обязательным аргументом).Однако после назначения оператора группе обеспечения доступности баз данных она не может быть перенесена или не назначена.Присвоение DAG может быть выполнено в явном виде при создании оператора, с помощью отложенного присвоения или даже на основании других операторов.


  • Q Так чтоВы должны сделать, чтобы «не отображать» задачу?

    A Просто не объявлять (создавать) ее.

  • Q И как бы вы это сделали?

    A Просто переместите объявление задачи в ваше предложение if-else

if middle_name_var == 'true':
    middle_name_task = PythonOperator(
        task_id='middle_name',
        provide_context=False,
        python_callable=middle_name,
        dag=dag
    )
    first_name_task >> middle_name_task >>last_name_task
else:
    first_name_task >> last_name_task
...