В настоящее время я настраиваю среду воздушного потока на AWS. Я хотел бы загрузить все даги на веб-сервер как из моей разработки, так и из основной ветки. Я синхронизирую c их при изменениях в двух разных бакетах S3, которые я подключаю к экземпляру EC2. Моя идея состоит в том, чтобы загружать даги с помощью загрузчика дэгов, как описано здесь для загрузки дагов из двух разных папок. Два различия guish между ними на веб-сервере, каждый файл python проверяет свою текущую папку и устанавливает владельца в default_args для сцены. Кроме того, этап устанавливается как очередь, и соответствующий рабочий процесс берет его.
Проблема заключается в реализации: загрузка дагов работает, но если я импортирую внешний файл python из подпапки в даге, который будет выполнить там, я получаю сообщение об ошибке, что модуль не может быть найден. Я предполагаю, что это связано с тем, что загрузчик dag загружает dags, а текущий рабочий каталог - это домашняя папка воздушного потока. Внешние файлы python, загруженные в даги, по-прежнему находятся в других папках, подключенных из s3.
Как вы можете видеть в Dockerfile, домашняя папка воздушного потока:
/usr/local/airflow
Моя структура кода в предыдущем каталоге выглядит следующим образом:
/dags
add_dag_bag.py
/stages/staging/dags
test_dag.py
/scripts
external_test.py
/stages/production/dags
test_dag.py
/scripts
external_test.py
add_dag_bag.py выглядит следующим образом:
from airflow.models import DagBag
import os
dags_dirs = ['/usr/local/airflow/stages/staging/dags', '/usr/local/airflow/stages/production/dags']
for dir in dags_dirs:
dag_bag = DagBag(os.path.expanduser(dir))
if dag_bag:
for dag_id, dag in dag_bag.dags.items():
print(dag_id)
globals()[dag_id] = dag
В test_dag.py
я импортирую некоторые python вызывается из папки сценариев, например from scripts.external_test import callable
.
Даг не загружается в веб-интерфейсе, в то время как я вижу следующую ошибку в журналах: No module named scripts.external_test
Любые идеи, как чтобы решить эту проблему?