Я попытался вызвать еще один dag с некоторыми параметрами в TriggerDagRunOperator, но в сработавшем dag объект dag_run всегда имеет значение None.
В TriggerDagRunOperator параметр сообщения добавляется в полезную нагрузку dag_run_obj.
def conditionally_trigger(context, dag_run_obj):
if context['params']['condition_param']:
dag_run_obj.payload = {'message': context['params']['message']}
pp.pprint(dag_run_obj.payload)
return dag_run_obj
trigger = TriggerDagRunOperator(
task_id='test_trigger_dagrun',
trigger_dag_id="example_trigger_target_dag",
python_callable=conditionally_trigger,
params={'condition_param': True, 'message': 'Hello World'},
dag=dag,
)
Я ожидал, что сработавший DAG может получить его, используя kwargs ['dag_run']. Conf ['message']), но, к сожалению, он не работает.
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['message']))
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag,
)
Объект dag_run в kwargs равен None
INFO - Executing <Task(PythonOperator): run_this> on 2019-01-18 16:10:18
INFO - Subtask: [2019-01-18 16:10:27,007] {models.py:1433} ERROR - 'NoneType' object has no attribute 'conf'
INFO - Subtask: Traceback (most recent call last):
INFO - Subtask: File "/Library/Python/2.7/site-packages/airflow/models.py", line 1390, in run
INFO - Subtask: result = task_copy.execute(context=context)
INFO - Subtask: File "/Library/Python/2.7/site-packages/airflow/operators/python_operator.py", line 80, in execute
INFO - Subtask: return_value = self.python_callable(*self.op_args, **self.op_kwargs)
INFO - Subtask: File "/Library/Python/2.7/site-packages/airflow/example_dags/example_trigger_target_dag.py", line 52, in run_this_func
INFO - Subtask: print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
INFO - Subtask: AttributeError: 'NoneType' object has no attribute 'conf'
Я также распечатал kwargs, и действительно, объект 'dag_run' - None.
Дагс - пример кода в Airflow, поэтому я не уверен, что случилось.
Кто-нибудь знает причину?
INFO - Subtask: kwargs: {u'next_execution_date': None, u'dag_run': None, u'tomorrow_ds_nodash': u'20190119', u'run_id': None, u'dag': <DAG: example_trigger_target_dag>, u'prev_execution_date': None, ...
Кстати, если я запускаю DAG из CLI, это сработало:
$ airflow trigger_dag 'example_trigger_target_dag' -r 'run_id' --conf '{"message":"test_cli"}'
Журналы:
INFO - Subtask: kwargs: {u'next_execution_date': None, u'dag_run': <DagRun example_trigger_target_dag @ 2019-01-18 ...
INFO - Subtask: Remotely received value of test_cli for key=message