Динамические задачи пропускаются в Airflow DAG - PullRequest
0 голосов
/ 09 января 2019

Ниже приведена простая копия созданной мной группы DAG. Группа обеспечения доступности баз данных имеет оператор ветвления для выбора потока выполнения, который объединяется в общую задачу. Задача должна сгенерировать список файлов, который будет использоваться для создания задачи для каждой записи в файле списка. Проблема в том, что я не могу выполнить динамические задачи.

"""
Required packages to execute DAG
"""
from __future__ import print_function
from builtins import range
import airflow
from airflow.models import DAG

from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule

import os
import sys


# DAG parameters

args = {
    'owner': 'AD',
    'depends_on_past': False,
    'start_date': datetime(2018, 5, 30),
    'end_date': datetime(9999, 12, 31),
    'dagrun_timeout': None,
    'timeout': None,
    'execution_timeout': None,
    'provide_context': True,
}

# create DAG object with Name and default_args (args can set in DAG definition or while execution/runtime)
dag = DAG('sodag', schedule_interval=None, default_args=args)


# Define task - below are examples of tasks created by instantiated by PythonOperator- calling methods written in other py clas
start = DummyOperator(task_id='start', dag=dag)
dummyjoin = DummyOperator(task_id='dummyjoin', dag=dag, trigger_rule=TriggerRule.ONE_SUCCESS)
multidummy = DummyOperator(task_id='multidummy', dag=dag)


def identify_pre_process(**context):
    return 'task1'


def xcl_preq(filename, **kwargs):
    return BashOperator(
            task_id="so_dag{}".format(filename),
            trigger_rule=TriggerRule.ONE_SUCCESS,
            provide_context=True,
            bash_command='echo "executing branch tasks"',
            dag=dag)


with dag:
    router = BranchPythonOperator(task_id='trigger_pre_process',
                                  python_callable=identify_pre_process,
                                  dag=dag)

    task1 = BashOperator(
                    task_id="task1",
                    bash_command='echo "executing task1"',
                    execution_timeout=None,
                    dag=dag)

    task2 = BashOperator(
                    task_id="task2",
                    bash_command='echo "executing task2"',
                    execution_timeout=None,
                    dag=dag)

with open('/root/filelist.txt', 'r') as fp:
    for file in fp:
        filename = os.path.basename(file)
        dummyjoin >> xcl_preq(filename) >> multidummy


start >> router
router >> task1 >> dummyjoin
router >> task2 >> dummyjoin

enter image description here

1 Ответ

0 голосов
/ 10 января 2019

Здесь причина не в том, что задачи генерируются динамически, а в том, что это сложнее. Ваш DAG работает очень хорошо, за исключением следующих тонких вещей: в строке

filename = os.path.basename(file)

переменная filename будет содержать специальный символ новой строки \n. В вашем примере filename будет принимать значения file\n, file1\n, file2\n. Это приводит к тому, что эти задачи не запускаются, поскольку специальные символы, по-видимому, не допускаются в качестве значений для task_id (я согласен, странно, что во время компиляции DAG не возникает никаких ошибок). Вы не видите в графическом представлении DAG в пользовательском интерфейсе, потому что символы новой строки там не отображаются, но если вы щелкнете по Подробности DAG, проблема станет видимой.

Dag details screenshot

Простое исправление заключается в удалении символов новой строки из строк после чтения из файла, то есть

filename = os.path.basename(file.rstrip())

Успех!

successful DAG screenshot

...