airflow не загружает задачи оператора из файла, отличного от файла DAG - PullRequest
0 голосов
/ 05 ноября 2019

Обычно мы определяем Операторы в том же файле python, где определена наша группа DAG (см. этот базовый пример). Так и я делал то же самое. Но мои задачи сами по себе БОЛЬШИЕ, с использованием пользовательских операторов, поэтому я хотел создать проект структурированной dag полиморфизма, где все такие задачи, использующие один и тот же оператор, находятся в отдельном файле. Для простоты приведу очень простой пример. У меня есть оператор x, имеющий несколько задач. Это структура моего проекта;

main_directory
  ├──tasks
  |  ├──operator_x
  |  |   └──op_x.py
  |  ├──operator_y
  |  :   └──op_y.py
  |   
  └──dag.py 

op_x.py имеет следующий метод;

def prepare_task():
    from main_directory.dag import dag
    t2 = BashOperator(
        task_id='print_inner_date',
        bash_command='date',
        dag=dag)
    return t2

, а файл dag.py содержит следующий код:

from main_directory.tasks.operator_x import prepare_task

default_args = {
    'retries': 5,
    'retry_delay': dt.timedelta(minutes=5),
    'on_failure_callback': gen_email(EMAIL_DISTRO, retry=False),
    'on_retry_callback': gen_email(EMAIL_DISTRO, retry=True),
    'start_date': dt.datetime(2019, 5, 10)
}
dag = DAG('test_dag', default_args=default_args, schedule_interval=dt.timedelta(days=1))

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = prepare_task()

Сейчаскогда я выполняю это в моей среде воздушного потока и запускаю airflow list_dags, я получаю в списке нужный тег с именем test_dag, но когда я выполняю airflow list_tasks -t test_dag, я получаю только одну задачу с идентификатором print_date, а НЕ ту, которая определена внутри подкаталога с помощьюID print_inner_date. Может ли кто-нибудь помочь мне понять, что я скучаю?

1 Ответ

2 голосов
/ 05 ноября 2019

Ваш код будет создавать циклический импорт. Вместо этого попробуйте следующее:

op_x.py должно иметь:

def prepare_task(dag):
    t2 = BashOperator(
        task_id='print_inner_date',
        bash_command='date',
        dag=dag)
    return t2

dag.py:

from main_directory.tasks.operator_x import prepare_task

default_args = {
    'retries': 5,
    'retry_delay': dt.timedelta(minutes=5),
    'on_failure_callback': gen_email(EMAIL_DISTRO, retry=False),
    'on_retry_callback': gen_email(EMAIL_DISTRO, retry=True),
    'start_date': dt.datetime(2019, 5, 10)
}
dag = DAG('test_dag', default_args=default_args, schedule_interval=dt.timedelta(days=1))

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = prepare_task(dag=dag)

Также убедитесь, что main_directory в вашемPYTHONPATH.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...