Задачи воздушного потока после BranchPythonOperator неожиданно пропускаются - PullRequest
4 голосов
/ 15 марта 2019

Мой даг определяется как ниже.Хотя flag1 и flag2 оба y, их как-то пропустили.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
import pandas as pd
from itertools import compress



default_args = {
    'owner': 'alex'
    , 'retries': 2
    , 'retry_delay': timedelta(minutes=15)
    , 'depends_on_past': False
    , 'start_date': datetime(2018, 11, 22)
}

dag = DAG(
    'test_dag'
    , catchup = False
    , default_args = default_args
    , schedule_interval = '@daily'
)

task1 = DummyOperator(
        task_id='task1',
        dag=dag,
    )

task2 = DummyOperator(
        task_id='task2',
        dag=dag,
    )

task3 = DummyOperator(
        task_id='task3',
        dag=dag,
    )


# 1 means yes, 0 means no
flag1 = 'y'
flag2 = 'y'
flag3 = 'y'

tasks_name = ['task1', 'task2', 'task3']
flags = [flag1, flag2, flag3]


def generate_branches(tasks_name, flags):
    res = []
    idx = 1
    root_name = 'switch'
    for sub_task, sub_flag in zip(tasks_name, flags):
        tmp_branch_operator = BranchPythonOperator(
            task_id=root_name+str(idx), # switch1, switch2, ...
            python_callable= lambda: sub_task if sub_flag == 'y' else 'None',
            dag=dag,
        )
        res.append(tmp_branch_operator)
        idx += 1
    return res


def set_dependencies(switches, transfer_operators):
    for sub_switch, sub_transfer_operator in zip(switches, transfer_operators):
        sub_switch.set_downstream(sub_transfer_operator)


transfer_operators = [task1, task2, task3]
gen_branches_op = generate_branches(tasks_name, flags)
set_dependencies(gen_branches_op, transfer_operators)

enter image description here

1 Ответ

0 голосов
/ 17 марта 2019

Эта проблема вызвана поздним связыванием лямбды. Поскольку лямбда вычисляется при вызове, поэтому каждый раз, когда ваша лямбда всегда возвращает последний элемент из списка, то есть task3.

Если вы можете просмотреть журналы switch1 и switch2, то обнаружите, что они имеют следующую ветвь task3 вместо task1 и task2 соответственно.

Чтобы избежать этого, вы можете принудительно оценить лямбду в то время, когда она определена, изменив значение python_callable в generate_branches():

def generate_branches(tasks_name, flags):
    res = []
    idx = 1
    root_name = 'switch'
    for sub_task, sub_flag in zip(tasks_name, flags):
        tmp_branch_operator = BranchPythonOperator(
            task_id=root_name+str(idx), # switch1, switch2, ...
            python_callable=lambda sub_task=sub_task: sub_task if sub_flag == "y", else "None"
            dag=dag,
        )
        res.append(tmp_branch_operator)
        idx += 1
    return res
...