Столкновение с воздушными потоками (zip) конфликтует, когда подпапки имеют одинаковое имя - PullRequest
1 голос
/ 25 июня 2019

Мы настраиваем инфраструктуру Airflow, в которой несколько групп специалистов по обработке данных могут управлять своими конвейерами обработки данных. Мы разработали кодовую базу Python, чтобы помочь им реализовать группы обеспечения доступности баз данных, которые включают функции и классы (также подклассы операторов) в различных пакетах и ​​модулях.

У каждой команды будет свой DAG, упакованный в ZIP-файл, вместе с функциями и классами в пакетах. Например, первый ZIP-файл будет содержать

ZIP1:

main_dag_teamA.py

вложенная папка1: package1-with-generic-functions + init .py

подпапка2: package2-with-generic-operator + init .py

И еще один ZIP-файл будет содержать

ZIP2:

main_dag_teamB.py

вложенная папка1: package1-with-generic-functions + init .py

подпапка2: package2-with-generic-operator + init .py

Обратите внимание, что в обоих файлах ZIP подпапка 1 и подпапка 2 обычно будут совершенно одинаковыми, то есть будут иметь одинаковые файлы с одинаковыми функциями и классами. Но со временем, когда станут доступны новые версии пакетов, содержимое пакета начнет отклоняться по пакетам DAG.

С этой настройкой мы сталкиваемся со следующей проблемой: кажется, что Airflow не очень хорошо обрабатывает пакеты с одинаковыми именами, когда содержимое пакетов / подпапок начинает отклоняться по ZIP. Потому что когда я запускаю «airflow list_dags», он показывает такие ошибки, как:

Файл "/data/share/airflow/dags/program1/program1.zip/program1.py", строка 1, в> из подпапки1.functions1, функция импорта1 ImportError: Нет модуля с именем 'subfolder1.functions1'

Проблема может быть воспроизведена с помощью следующего кода, где две маленькие группы доступности баз данных находятся в своих ZIP-файлах вместе с пакетом my_functions, который имеет то же имя, но разное содержимое.

DAG пакет ZIP 1:

program1.py

from my_functions.functions1 import function1

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def do_it():
    print('program1')

dag = DAG(
    'program1',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 6, 23)
)

hello_operator = PythonOperator(task_id='program1_task1', python_callable=do_it, dag=dag)

my_functions / functions1.py:

def function1():
    print('function1')

Пакет DAG ZIP 2:

program2.py:

from my_functions.functions2 import function2

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def do_it():
    print('program1')

dag = DAG(
    'program1',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 6, 23)
)

hello_operator = PythonOperator(task_id='program2_task2', python_callable=do_it, dag=dag)

my_functions / functions2.py:

def function2():
    print('function2')

С этими двумя файлами ZIP, когда я запускаю "список_отходов воздушного потока", появляется ошибка:

Файл "/data/share/airflow/dags/program1/program1.zip/program1.py", строка 1, в from subfolder1.functions1 import function1 ImportError: Нет модуля с именем 'subfolder1.functions1'

Если содержимое вложенных папок в ZIP-файлах одинаковое, ошибки не возникает.

Мой вопрос: как я могу предотвратить это столкновение подпапок в ZIP? Я действительно хотел бы иметь полностью независимые от кода группы DAG с собственной версией пакетов.

...