Управление разными этапами с помощью одного веб-сервера и двух рабочих - PullRequest
0 голосов
/ 07 мая 2020

В настоящее время я настраиваю среду воздушного потока на AWS. Я хотел бы загрузить все даги на веб-сервер как из моей разработки, так и из основной ветки. Я синхронизирую c их при изменениях в двух разных бакетах S3, которые я подключаю к экземпляру EC2. Моя идея состоит в том, чтобы загружать даги с помощью загрузчика дэгов, как описано здесь для загрузки дагов из двух разных папок. Два различия guish между ними на веб-сервере, каждый файл python проверяет свою текущую папку и устанавливает владельца в default_args для сцены. Кроме того, этап устанавливается как очередь, и соответствующий рабочий процесс берет его.

Проблема заключается в реализации: загрузка дагов работает, но если я импортирую внешний файл python из подпапки в даге, который будет выполнить там, я получаю сообщение об ошибке, что модуль не может быть найден. Я предполагаю, что это связано с тем, что загрузчик dag загружает dags, а текущий рабочий каталог - это домашняя папка воздушного потока. Внешние файлы python, загруженные в даги, по-прежнему находятся в других папках, подключенных из s3.

Как вы можете видеть в Dockerfile, домашняя папка воздушного потока:

/usr/local/airflow

Моя структура кода в предыдущем каталоге выглядит следующим образом:

/dags
  add_dag_bag.py

/stages/staging/dags
  test_dag.py
  /scripts
    external_test.py

/stages/production/dags
  test_dag.py
  /scripts
    external_test.py

add_dag_bag.py выглядит следующим образом:

from airflow.models import DagBag
import os

dags_dirs = ['/usr/local/airflow/stages/staging/dags', '/usr/local/airflow/stages/production/dags']
for dir in dags_dirs:
    dag_bag = DagBag(os.path.expanduser(dir))
    if dag_bag:
        for dag_id, dag in dag_bag.dags.items():
            print(dag_id)
            globals()[dag_id] = dag

В test_dag.py я импортирую некоторые python вызывается из папки сценариев, например from scripts.external_test import callable.

Даг не загружается в веб-интерфейсе, в то время как я вижу следующую ошибку в журналах: No module named scripts.external_test

Любые идеи, как чтобы решить эту проблему?

Ответы [ 2 ]

0 голосов
/ 11 мая 2020

Я проводил дополнительные исследования, но пока не нашел решения. Возможно, мне все еще не хватает какой-то конфигурации. Я изучил упакованные DAG и пришел к следующей идее:

  • В зависимости от ветки (разработка или мастер) CircleCI создает zip-папки с дополнительной информацией в имя в зависимости от этапа, так что в usr/local/airflow/dags у меня есть две zip-папки, например, test_staging.zip и test_production.zip, обе содержат другие python файлы. В файле dag я могу изменить идентификатор DAG в зависимости от этапа, чтобы оба были загружены.
  • Проблема, похоже, в следующем: другие внешние python файлы в zip-папке имеют то же имя, но другое содержимое (в зависимости от ветки), и только один (тот, который загружается первым) доступен в воздушном потоке.

Это не решение, но показывает, что таким образом невозможно управлять разными этапами с использованием упакованных DAG. Однако, может быть, я упускаю из виду эту идею?

0 голосов
/ 07 мая 2020

Вы пробовали установить PYTHONPATH в домашнюю папку dag? Что-то вроде PYTHONPATH=../../dags python3 external.py

...