Воздушный поток DAG в функциях? - PullRequest
0 голосов
/ 30 октября 2018

Я работаю в $AIRFLOW_HOME/dags. Я создал следующие файлы:

- common
  |- __init__.py   # empty
  |- common.py     # common code
- foo_v1.py        # dag instanciation

В common.py:

default_args = ...

def create_dag(project, version):
  dag_id = project + '_' + version
  dag = DAG(dag_id, default_args=default_args, schedule_interval='*/10 * * * *', catchup=False)
  print('creating DAG ' + dag_id)

  t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

  t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

  t2.set_upstream(t1)

В foo_v1.py:

 from common.common import create_dag

 create_dag('foo', 'v1')

При тестировании скрипта с python все выглядит нормально:

 $ python foo_v1.py
 [2018-10-29 17:08:37,016] {__init__.py:57} INFO - Using executor SequentialExecutor
 creating DAG pgrandjean_pgrandjean_spark2.1.0_hadoop2.6.0

Затем я запускаю веб-сервер и планировщик локально. Моя проблема в том, что я не вижу DAG с идентификатором foo_v1. Файл pyc не создается. Что делается не так? Почему код в foo_v1.py не выполняется?

Ответы [ 2 ]

0 голосов
/ 30 октября 2018

Чтобы быть найденным Airflow, объект DAG, возвращаемый create_dag(), должен находиться в глобальном пространстве имен модуля foo_v1.py. Один из способов поместить группу доступности базы данных в глобальное пространство имен - просто назначить ее переменной уровня модуля:

from common.common import create_dag

dag = create_dag('foo', 'v1')

Другой способ - обновить глобальное пространство имен, используя globals():

globals()['foo_v1'] = create_dag('foo', 'v1')

Последнее может выглядеть как излишнее, но это полезно для динамического создания нескольких групп DAG . Например, в цикле for:

for i in range(10):
    globals()[f'foo_v{i}'] = create_dag('foo', f'v{i}')

Примечание: Любой файл *.py, помещенный в $AIRFLOW_HOME/dags (даже в подкаталогах, например, common в вашем случае), будет проанализирован Airflow. Если вы не хотите этого, вы можете использовать упакованных DAG .

0 голосов
/ 30 октября 2018

Вам необходимо присвоить метку экспортируемой переменной в модуле. Если метка отсутствует в модуле, __dict__ процессор DagBag воздушного потока не поднимет ее.

Проверьте источник здесь: https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L428

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...