DAG не распознает задачи Airflow - PullRequest
1 голос
/ 02 октября 2019

У меня есть файл, который определяет объект DAG:

dags / my_dag.py

from airflow import DAG
from datetime import datetime

default_args = {
    'owner': 'pilota',
    'depends_on_past': False,
    'start_date': datetime(2019, 10, 1),
    'email': ['some@email.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
}

bts_dag = DAG(
    'hist_data_etl', default_args=default_args, schedule_interval='@once')

Затем в другом файле я импортирую созданный dag иопределить мои задачи:

from ingestion.airflow_home.dags.my_dag import bts_dag
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from ingestion.datatransformer import fetch_and_transform_bts_data_col

NUM_ENGINES = 4

template_command = '''
    ipcluster start n {{ params.cluster }}
    sleep 5
'''

start_iparallel_cluster = BashOperator(
    task_id='start_cluster',
    bash_command=template_command,
    retries=3,
    params={'params': NUM_ENGINES},
    dag=bts_dag)


import_hist_bts_data_task = PythonOperator(
    task_id='fetch_transform_hist_col',
    python_callable=fetch_and_transform_bts_data_col,
    op_kwargs={
        'bucket': 'some-bucket', 'path': 'hello/', 'num_files': 1
    },
    dag=bts_dag)

start_iparallel_cluster >> import_hist_bts_data_task

проверка работоспособности:

$ airflow list_dags

выход:

 -------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
hist_data_etl

однако

$ airflow list_tasks hist_data_etl

не выводит ни одну из моих задач. Каким-то образом airflow не регистрирует задачи, относящиеся к группе обеспечения доступности баз данных, определенной в другом файле.

Пожалуйста, помогите:)

1 Ответ

1 голос
/ 02 октября 2019
  • Из-за способа, которым синтаксический анализ файла dag работает в Airflow, я не ожидаю, что это сработает
  • Даже у меня нет полного представления о внутренностях, ноВоздушный поток порождает дочерние процессы для анализа файлов определения dag (файлов, определенных некоторыми признаками). Каждый процесс анализирует разные подмножества файлов => вполне вероятно, что разные файлы обрабатываются разными процессами
  • Я считаю, что в вашей реализации логический порядок синтаксического анализа файлов (сначала анализ файла dag, а затем файла задачи)) не сохраняется, и поэтому все не работает

Однако, с некоторыми изменениями в вашем подходе, вы можете получить этот рабочий

первый файл

# dag_object_creator.py

from airflow import DAG
from datetime import datetime

default_args = {
    'owner': 'pilota',
    'depends_on_past': False,
    'start_date': datetime(2019, 10, 1),
    'email': ['some@email.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
}

def create_dag_object():
    bts_dag = DAG(dag_id='hist_data_etl',
                  default_args=default_args,
                  schedule_interval='@once')
    return bts_dag

второй файл

# tasks_creator.py

# this import statement is problematic
# from ingestion.airflow_home.dags.my_dag import bts_dag
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from ingestion.datatransformer import fetch_and_transform_bts_data_col

NUM_ENGINES = 4

template_command = '''
    ipcluster start n {{ params.cluster }}
    sleep 5
'''

def create_bash_task(bts_dag):
    start_iparallel_cluster = BashOperator(
        task_id='start_cluster',
        bash_command=template_command,
        retries=3,
        params={'params': NUM_ENGINES},
        dag=bts_dag)
    return start_iparallel_cluster


def create_python_task(bts_dag):
    import_hist_bts_data_task = PythonOperator(
        task_id='fetch_transform_hist_col',
        python_callable=fetch_and_transform_bts_data_col,
        op_kwargs={
            'bucket': 'pilota-ml-raw-store', 'path': 'flights/', 'num_files': 1
        },
        dag=bts_dag)
    return import_hist_bts_data_task

третий файл

# dag_definition_file.py

import dag_object_creator
import tasks_creator

# create dag object
# stuff from 'dag_object_creator.py' can be put here directly,
# i just broke down things for clarity
bts_dag = dag_object_creator.create_dag_object()

# create tasks
start_iparallel_cluster = tasks_creator.create_bash_task(bts_dag)
import_hist_bts_data_task = tasks_creator.create_python_task(bts_dag)

# chaining tasks
start_iparallel_cluster >> import_hist_bts_data_task

приведенный выше макет кода обеспечит следующее поведение

  • предварительный процессначинает синтаксический анализ только dag_definition_file.py (два других файла пропускаются, так как в глобальной области видимости «DAG» не создается)

  • , так как при выполнении операторов import эти файлы анализируются

  • когда выполняются операторы создания dag / задачи, объекты DAG и задачи соответственно создаются в глобальной области видимости

, поэтому все становится на свои местахорошо, и эта реализация должна работать (не проверено, но основано на неофициальных знаниях)


Рекомендуемые чтения

...