У меня есть файл, который определяет объект DAG:
dags / my_dag.py
from airflow import DAG
from datetime import datetime
default_args = {
'owner': 'pilota',
'depends_on_past': False,
'start_date': datetime(2019, 10, 1),
'email': ['some@email.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
}
bts_dag = DAG(
'hist_data_etl', default_args=default_args, schedule_interval='@once')
Затем в другом файле я импортирую созданный dag иопределить мои задачи:
from ingestion.airflow_home.dags.my_dag import bts_dag
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from ingestion.datatransformer import fetch_and_transform_bts_data_col
NUM_ENGINES = 4
template_command = '''
ipcluster start n {{ params.cluster }}
sleep 5
'''
start_iparallel_cluster = BashOperator(
task_id='start_cluster',
bash_command=template_command,
retries=3,
params={'params': NUM_ENGINES},
dag=bts_dag)
import_hist_bts_data_task = PythonOperator(
task_id='fetch_transform_hist_col',
python_callable=fetch_and_transform_bts_data_col,
op_kwargs={
'bucket': 'some-bucket', 'path': 'hello/', 'num_files': 1
},
dag=bts_dag)
start_iparallel_cluster >> import_hist_bts_data_task
проверка работоспособности:
$ airflow list_dags
выход:
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
hist_data_etl
однако
$ airflow list_tasks hist_data_etl
не выводит ни одну из моих задач. Каким-то образом airflow не регистрирует задачи, относящиеся к группе обеспечения доступности баз данных, определенной в другом файле.
Пожалуйста, помогите:)