У меня есть требование, когда есть родительский Dag только с одной задачей, которая создает определенные параметры (нефиксированные). Давайте назовем их params1, params2 и params3. Теперь я хочу создать три DAG из задачи в родительском Dag, у которых будут параметры, доступные в cotext каждой задачи с DAG. Я шел по следующей ссылке, чтобы создать динамический дагс и попробовал это -
https://airflow.incubator.apache.org/faq.html#how-can-i-create-dags-dynamically
class ParentBigquerySql(object):
def __init__(self):
pass
def run(self, **context):
logging.info('Running job')
batch_id = 100
#parent_sql = '''SELECT max(run_start_date) AS run_start_date,
# max(run_end_date) AS run_end_date
# FROM `vintel_rel_2_0_staging_westfield.in_venue_batch_dates_daily`'''
parent_sql = '''SELECT run_start_date, run_end_date
from vintel_rel_2_0_staging_westfield.in_venue_batch_dates_daily
order by 1 ,2'''
params = self.get_params(batch_id, parent_sql)
XcomManager.push_query_params(context, params)
return params
def get_params(self, batch_id, parent_sql):
batch_id = str(batch_id)
result = BigQueryManager.read_query_to_table(parent_sql)
t_list = []
if result and type(result) is not list and result.error_result:
#LogManager.info("Error in running the parent jobs - %s." % (result.error_result))
#LogManager.info("Not populating cache... ")
pass
elif len(result) > 0:
for row in result:
if len(row) > 0:
run_start_date = row[0]
run_end_date = row[1]
if run_start_date and run_end_date:
t_list.append({'min_date': run_start_date, 'max_date': run_end_date})
params = {}
params['date_range'] = t_list
return params
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 3, 23),
'retries': 1,
'provide_context': True,
'retry_delay': timedelta(minutes=2),
}
dag = DAG('parent_dynamic_job_dag', # give the dag a name
schedule_interval='@once',
default_args=default_args
)
def pull_child11(**context):
logging.info(" Date range " + str(context['date_range']))
def conditionally_trigger(context, dag_run_obj):
return dag_run_obj
def create_dag_from_task(**context):
job = ParentBigquerySql()
job.run(**context)
logging.info("Context data")
logging.info(context)
params = XcomManager.pull_query_params(context)
logging.info("Xcomm parameters: " + str(params))
tl = []
counter = 1
for d1 in params['date_range']:
dyn_dag_id = 'child_dag_id' + str(counter)
dag_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': context['execution_date'],
'execution_date': context['execution_date'],
'retries': 1,
'provide_context': True,
'retry_delay': timedelta(minutes=2),
}
dyn_dag = DAG(dyn_dag_id, # give the dag a name
schedule_interval='@once',
default_args=dag_args
)
t1 = PythonOperator(
task_id='child' + str(counter),
dag=dyn_dag,
provide_context=True,
python_callable=pull_child11,
op_kwargs={'dag_id':10, 'date_range':d1}
)
t2 = TriggerDagRunOperator(task_id='test_trigger_dag',
trigger_dag_id='child_dag_id' + str((counter + 1)),
python_callable=conditionally_trigger,
dag=dyn_dag)
t1.set_downstream(t2)
logging.info("Updating globals for the dag " + dyn_dag_id)
#trigger_op.execute(context)
globals()[dyn_dag_id] = dyn_dag ##Assing DAG objects to global namespace
if counter > 2:
break
counter = counter + 1
push1 = PythonOperator(
task_id='100-Parent',
dag=dag,
provide_context=True,
python_callable=create_dag_from_task,
op_kwargs={'dag_id':100})
push11 = PythonOperator(
task_id='101-Child',
dag=dag,
provide_context=True,
python_callable=pull_child11,
op_kwargs={'dag_id': 100, 'date_range': {'start_date': 'temp_start_date', 'end_date': 'temp_end_date'}})
t2 = TriggerDagRunOperator(task_id='test_trigger_dag',
trigger_dag_id='child_dag_id1',
python_callable=conditionally_trigger,
dag=dag)
push1.set_downstream(push11)
push11.set_downstream(t2)
Я получаю следующую ошибку -
[2018-05-01 09:24:27,764] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-05-01 09:24:27,875] {models.py:189} INFO - Filling up the DagBag from /mnt/test_project /airflow/dags
[2018-05-01 09:25:02,074] {models.py:1197} INFO - Dependencies all met for <TaskInstance: parent_dynamic_job_dag.test_trigger_dag 2018-04-23 00:00:00 [up_for_retry]>
[2018-05-01 09:25:02,081] {base_executor.py:49} INFO - Adding to queue: airflow run parent_dynamic_job_dag test_trigger_dag 2018-04-23T00:00:00 --local -sd DAGS_FOLDER/test_dynamic_parent_child.py
[2018-05-01 09:25:07,003] {sequential_executor.py:40} INFO - Executing command: airflow run parent_dynamic_job_dag test_trigger_dag 2018-04-23T00:00:00 --local -sd DAGS_FOLDER/test_dynamic_parent_child.py
[2018-05-01 09:25:08,235] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-05-01 09:25:08,431] {models.py:189} INFO - Filling up the DagBag from /mnt/test_project /airflow/dags/test_dynamic_parent_child.py
[2018-05-01 09:26:44,207] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run parent_dynamic_job_dag test_trigger_dag 2018-04-23T00:00:00 --job_id 178 --raw -sd DAGS_FOLDER/test_dynamic_parent_child.py']
[2018-05-01 09:26:45,243] {base_task_runner.py:98} INFO - Subtask: [2018-05-01 09:26:45,242] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-05-01 09:26:45,416] {base_task_runner.py:98} INFO - Subtask: [2018-05-01 09:26:45,415] {models.py:189} INFO - Filling up the DagBag from /mnt/test_project /airflow/dags/test_dynamic_parent_child.py
[2018-05-01 09:27:49,798] {base_task_runner.py:98} INFO - Subtask: [2018-05-01 09:27:49,797] {models.py:189} INFO - Filling up the DagBag from /mnt/test_project /airflow/dags
[2018-05-01 09:27:50,108] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-05-01 09:27:50,108] {base_task_runner.py:98} INFO - Subtask: File "/Users/manishz/anaconda2/bin/airflow", line 27, in <module>
[2018-05-01 09:27:50,109] {base_task_runner.py:98} INFO - Subtask: args.func(args)
[2018-05-01 09:27:50,109] {base_task_runner.py:98} INFO - Subtask: File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-05-01 09:27:50,110] {base_task_runner.py:98} INFO - Subtask: pool=args.pool,
[2018-05-01 09:27:50,110] {base_task_runner.py:98} INFO - Subtask: File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-05-01 09:27:50,110] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs)
[2018-05-01 09:27:50,111] {base_task_runner.py:98} INFO - Subtask: File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-05-01 09:27:50,111] {base_task_runner.py:98} INFO - Subtask: result = task_copy.execute(context=context)
[2018-05-01 09:27:50,112] {base_task_runner.py:98} INFO - Subtask: File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/operators/dagrun_operator.py", line 67, in execute
[2018-05-01 09:27:50,112] {base_task_runner.py:98} INFO - Subtask: dr = trigger_dag.create_dagrun(
[2018-05-01 09:27:50,112] {base_task_runner.py:98} INFO - Subtask: AttributeError: 'NoneType' object has no attribute 'create_dagrun'
[2018-05-01 09:28:14,407] {jobs.py:2521} INFO - Task exited with return code 1
[2018-05-01 09:28:14,569] {jobs.py:1959} ERROR - Task instance <TaskInstance: parent_dynamic_job_dag.test_trigger_dag 2018-04-23 00:00:00 [failed]> failed
[2018-05-01 09:28:14,573] {models.py:4584} INFO - Updating state for <DagRun parent_dynamic_job_dag @ 2018-04-23 00:00:00: backfill_2018-04-23T00:00:00, externally triggered: False> considering 3 task(s)
[2018-05-01 09:28:14,576] {models.py:4631} INFO - Marking run <DagRun parent_dynamic_job_dag @ 2018-04-23 00:00:00: backfill_2018-04-23T00:00:00, externally triggered: False> failed
[2018-05-01 09:28:14,600] {jobs.py:2125} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 2 | kicked_off: 0 | failed: 1 | skipped: 0 | deadlocked: 0 | not ready: 0
Traceback (most recent call last):
File "/Users/manishz/anaconda2/bin/airflow", line 27, in <module>
args.func(args)
File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/bin/cli.py", line 185, in backfill
delay_on_limit_secs=args.delay_on_limit)
File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/models.py", line 3724, in run
job.run()
File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/jobs.py", line 198, in run
self._execute()
File "/Users/manishz/anaconda2/lib/python2.7/site-packages/airflow/jobs.py", line 2441, in _execute
raise AirflowException(err)
airflow.exceptions.AirflowException: ---------------------------------------------------
Some task instances failed:
%s
Но приведенный выше код не запускает следующие пакеты. Есть идеи, что здесь происходит?
Заранее спасибо
Manish