Условные задачи с использованием PythonOperator и BranchPythonOperator - PullRequest
0 голосов
/ 04 августа 2020

Привет, ребята, я новичок в Airflow и python. Мне нужно запускать задачи на основе значения переменной на входе json. Если значение переменной « страхование » равно « истина », то нужно запустить task1, task2, task3, иначе нужно запустить task4, task5, task6. Так как я новичок ie в этом, я мало знаю об использовании PythonOperator и BranchPythonOperator.

Это мой ввод json:

{
  "car": {
    "engine_no": "123_st_456",
    "json": "{\"make\":\"Honda\",\"model\": Jazz, \"insurance\":\"true\",\"pollution\":\"true\" }"
  }
}

Код приведен ниже :

from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators import PythonOperator
import logging
import json

default_args = {
    'owner': 'airflow',
    'depends_on_past': False
}

dag = DAG('DAG_NAME',default_args=default_args,schedule_interval=None,max_active_runs=5, start_date=datetime(2020, 8, 4))   

PythonOperator(
   task_id = 'sample_task',
   python_callable = 'sample_fun',
   op_kwargs = {
       json  : '{{ dag_run.car.json}}'
   },
   provide_context=True,
   dag = dag
)

def sample_fun( json,**kwargs):
  insurance_flag = json.dumps(json)['insurance']

task1 = BashOperator(
    task_id='task1',
    bash_command='echo 1'
)

task2 = BashOperator(
    task_id='task2',
    bash_command='echo 2'
)

task3 = BashOperator(
    task_id='task3',
    bash_command='echo 3'
) 

task4 = BashOperator(
    task_id='task4',
    bash_command='echo 4'
)  

task5 = BashOperator(
    task_id='task5',
    bash_command='echo 5'
)

task6 = BashOperator(
    task_id='task6',
    bash_command='echo 6'
) 


if insurance_flag == "true":
    task1.dag = dag
    task2.dag = dag
    task3.dag = dag
    task1 >> task2 >> task3
    
else:
    task4.dag = dag
    task5.dag = dag
    task6.dag = dag
    task4 >> task5 >> task6

1 Ответ

0 голосов
/ 05 августа 2020

Основная проблема в вашем коде

  1. Файл dag-definition-file - это , постоянно анализируемый Airflow в фоновом режиме и сгенерированные DAG & задачи выбираются планировщиком. То, как ваш файл связывает задачи вместе, создает несколько проблем

    • все 6 задач (task1 .. task6) ВСЕГДА создаются (и, следовательно, они будет работать всегда, независимо от insurance_flag); просто их межзадачная зависимость установлена ​​в соответствии с insurance_flag

    • , правильный способ вместо этого - поместить оба экземпляр задачи (создание объекта PythonOperator taskn), а также задание внутри этого блока if .. else. Таким образом, ненужные задачи не будут созданы (и, следовательно, не будут запускаться)

  2. Хотя одного пункта 1 выше должно быть достаточно, чтобы исправить ваш кода, могу ли я предложить вам предложение по улучшению: наличие Variable, читаемого в файле определения dag , означает, что запрос SQL запускается Airflow SQLAlchemy ORM очень часто в background (каждый цикл непрерывного синтаксического анализа файла определения dag)

    • это не только излишне перегружает ваш бэкэнд SQLAlchemy meta-db, но и замедляет синтаксический анализатор (в крайнем случае может привести к Таймаут DagBag , если синтаксический анализ начинает занимать слишком много времени)
    • вместо этого вы можете использовать BranchPythonOperator правильным образом , чтобы переместить это значение переменной на время выполнения ( когда DAG / задачи будут фактически запускаться), а не Время генерации DAG (когда dag-файл анализируется Airflow, а DAG генерируется на веб-сервере); вот код для этого (и вам следует полностью отказаться от этого блока if-else)
    """ branch 1 """
    task1 >> task2 >> task3
    """ branch 2 """
    task4 >> task5 >> task6
    
    def branch_decider(**kwargs):
        my_var_dict = Variable.get('my_var_name', deserialize_json=True)
        # decide which branch to take based on insurance flag
        if my_var_dict['car']['json']['insurance']:
            return 'task1'
        else:
            return 'task4'
    
    branch_task = BranchPythonOperator(task_id='branch_task',
                                       dag=dag,
                                       python_callable=branch_decider)
    

Другие (незначительные) проблемы в вашем коде

  • Отсутствует обязательный dag аргумент из task экземпляров

       task1 = BashOperator(
         task_id='task1',
         bash_command='echo 1',
         dag=dag
       )
    
  • a dagling PythonOperator с callable which json.dump s Переменная, которая не решает никаких задач (если я неправильно понял ваш код / ​​намерение, удалите его полностью)

    PythonOperator(
        task_id='sample_task',
        python_callable=sample_fun,
        op_kwargs={
            json: '{{ dag_run.car.json}}'
        },
        provide_context=True,
        dag=dag
    )
    
    
    def sample_fun(json, **kwargs):
        insurance_flag = json.dumps(json)['insurance']
    

UPDATE-1

Ответ на запросы, поднятые над комментариями

Мы использовали Variable.get (my_ var_ name) . Что это за my_ var_ name

Переменные имеют key & value, my_var_name - это key переменной (см. Столбец Key в следующий снимок экрана из пользовательского интерфейса Airflow)

Airflow Variable UI


If condition satisfies return 'task1', 'task2', 'task3' else 'task4', 'task5', 'task6'. Can we add more than 1 tasks in return

No you can't. (you don't have to)

BranchPythonOperator requires that it's python_callable should return the task_id of first task of the branch only

  • 1st branch: task1, task2, task3, first task's task_id = task1
  • 2nd branch: task4, task5, task6, first task's task_id = task4

Furthermore do understand that since the above two sets of tasks have already been wired together, so they will be naturally executed after one-another in that sequence (otherwise what would be the point of wiring them anyways?)

task1 >> task2 >> task3

Check out these links (in addition to links already inlined in answer above)

...