Мы настраиваем инфраструктуру Airflow, в которой несколько групп специалистов по обработке данных могут управлять своими конвейерами обработки данных. Мы разработали кодовую базу Python, чтобы помочь им реализовать группы обеспечения доступности баз данных, которые включают функции и классы (также подклассы операторов) в различных пакетах и модулях.
У каждой команды будет свой DAG, упакованный в ZIP-файл, вместе с функциями и классами в пакетах. Например, первый ZIP-файл будет содержать
ZIP1:
main_dag_teamA.py
вложенная папка1: package1-with-generic-functions + init .py
подпапка2: package2-with-generic-operator + init .py
И еще один ZIP-файл будет содержать
ZIP2:
main_dag_teamB.py
вложенная папка1: package1-with-generic-functions + init .py
подпапка2: package2-with-generic-operator + init .py
Обратите внимание, что в обоих файлах ZIP подпапка 1 и подпапка 2 обычно будут совершенно одинаковыми, то есть будут иметь одинаковые файлы с одинаковыми функциями и классами.
Но со временем, когда станут доступны новые версии пакетов, содержимое пакета начнет отклоняться по пакетам DAG.
С этой настройкой мы сталкиваемся со следующей проблемой: кажется, что Airflow не очень хорошо обрабатывает пакеты с одинаковыми именами, когда содержимое пакетов / подпапок начинает отклоняться по ZIP.
Потому что когда я запускаю «airflow list_dags», он показывает такие ошибки, как:
Файл "/data/share/airflow/dags/program1/program1.zip/program1.py", строка 1, в> из подпапки1.functions1, функция импорта1
ImportError: Нет модуля с именем 'subfolder1.functions1'
Проблема может быть воспроизведена с помощью следующего кода, где две маленькие группы доступности баз данных находятся в своих ZIP-файлах вместе с пакетом my_functions, который имеет то же имя, но разное содержимое.
DAG пакет ZIP 1:
program1.py
from my_functions.functions1 import function1
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def do_it():
print('program1')
dag = DAG(
'program1',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 6, 23)
)
hello_operator = PythonOperator(task_id='program1_task1', python_callable=do_it, dag=dag)
my_functions / functions1.py:
def function1():
print('function1')
Пакет DAG ZIP 2:
program2.py:
from my_functions.functions2 import function2
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def do_it():
print('program1')
dag = DAG(
'program1',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 6, 23)
)
hello_operator = PythonOperator(task_id='program2_task2', python_callable=do_it, dag=dag)
my_functions / functions2.py:
def function2():
print('function2')
С этими двумя файлами ZIP, когда я запускаю "список_отходов воздушного потока", появляется ошибка:
Файл "/data/share/airflow/dags/program1/program1.zip/program1.py", строка 1, в
from subfolder1.functions1 import function1 ImportError: Нет модуля с именем 'subfolder1.functions1'
Если содержимое вложенных папок в ZIP-файлах одинаковое, ошибки не возникает.
Мой вопрос: как я могу предотвратить это столкновение подпапок в ZIP? Я действительно хотел бы иметь полностью независимые от кода группы DAG с собственной версией пакетов.