Обычно мы определяем Операторы в том же файле python, где определена наша группа DAG (см. этот базовый пример). Так и я делал то же самое. Но мои задачи сами по себе БОЛЬШИЕ, с использованием пользовательских операторов, поэтому я хотел создать проект структурированной dag полиморфизма, где все такие задачи, использующие один и тот же оператор, находятся в отдельном файле. Для простоты приведу очень простой пример. У меня есть оператор x
, имеющий несколько задач. Это структура моего проекта;
main_directory
├──tasks
| ├──operator_x
| | └──op_x.py
| ├──operator_y
| : └──op_y.py
|
└──dag.py
op_x.py имеет следующий метод;
def prepare_task():
from main_directory.dag import dag
t2 = BashOperator(
task_id='print_inner_date',
bash_command='date',
dag=dag)
return t2
, а файл dag.py содержит следующий код:
from main_directory.tasks.operator_x import prepare_task
default_args = {
'retries': 5,
'retry_delay': dt.timedelta(minutes=5),
'on_failure_callback': gen_email(EMAIL_DISTRO, retry=False),
'on_retry_callback': gen_email(EMAIL_DISTRO, retry=True),
'start_date': dt.datetime(2019, 5, 10)
}
dag = DAG('test_dag', default_args=default_args, schedule_interval=dt.timedelta(days=1))
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = prepare_task()
Сейчаскогда я выполняю это в моей среде воздушного потока и запускаю airflow list_dags
, я получаю в списке нужный тег с именем test_dag
, но когда я выполняю airflow list_tasks -t test_dag
, я получаю только одну задачу с идентификатором print_date
, а НЕ ту, которая определена внутри подкаталога с помощьюID print_inner_date
. Может ли кто-нибудь помочь мне понять, что я скучаю?