Триггер DAG не может получить параметры от TriggerDagRunOperator - PullRequest
0 голосов
/ 18 января 2019

Я попытался вызвать еще один dag с некоторыми параметрами в TriggerDagRunOperator, но в сработавшем dag объект dag_run всегда имеет значение None.

В TriggerDagRunOperator параметр сообщения добавляется в полезную нагрузку dag_run_obj.

def conditionally_trigger(context, dag_run_obj):
if context['params']['condition_param']:
    dag_run_obj.payload = {'message': context['params']['message']}
    pp.pprint(dag_run_obj.payload)
    return dag_run_obj


trigger = TriggerDagRunOperator(
    task_id='test_trigger_dagrun',
    trigger_dag_id="example_trigger_target_dag",
    python_callable=conditionally_trigger,
    params={'condition_param': True, 'message': 'Hello World'},
    dag=dag,
)

Я ожидал, что сработавший DAG может получить его, используя kwargs ['dag_run']. Conf ['message']), но, к сожалению, он не работает.

def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".
      format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag,
)

Объект dag_run в kwargs равен None

INFO - Executing <Task(PythonOperator): run_this> on 2019-01-18 16:10:18 
INFO - Subtask: [2019-01-18 16:10:27,007] {models.py:1433} ERROR - 'NoneType' object has no attribute 'conf'
INFO - Subtask: Traceback (most recent call last):
INFO - Subtask:   File "/Library/Python/2.7/site-packages/airflow/models.py", line 1390, in run
INFO - Subtask:     result = task_copy.execute(context=context)
INFO - Subtask:   File "/Library/Python/2.7/site-packages/airflow/operators/python_operator.py", line 80, in execute
INFO - Subtask:     return_value = self.python_callable(*self.op_args, **self.op_kwargs)
INFO - Subtask:   File "/Library/Python/2.7/site-packages/airflow/example_dags/example_trigger_target_dag.py", line 52, in run_this_func
INFO - Subtask:     print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
INFO - Subtask: AttributeError: 'NoneType' object has no attribute 'conf'

Я также распечатал kwargs, и действительно, объект 'dag_run' - None. Дагс - пример кода в Airflow, поэтому я не уверен, что случилось. Кто-нибудь знает причину?

INFO - Subtask: kwargs: {u'next_execution_date': None, u'dag_run': None, u'tomorrow_ds_nodash': u'20190119', u'run_id': None, u'dag': <DAG: example_trigger_target_dag>, u'prev_execution_date': None, ...

Кстати, если я запускаю DAG из CLI, это сработало:

$ airflow trigger_dag 'example_trigger_target_dag' -r 'run_id' --conf '{"message":"test_cli"}'

Журналы:

INFO - Subtask: kwargs: {u'next_execution_date': None, u'dag_run': <DagRun example_trigger_target_dag @ 2019-01-18 ...

INFO - Subtask: Remotely received value of test_cli for key=message
...