Я написал довольно простую группу обеспечения доступности баз данных, которая сначала выполняет некоторые операции python, а затем должна обновить таблицу базы данных с возвращаемым значением. Вот DAG-код:
tmpl_search_path = Variable.get('sql_path')
dag = DAG('test_dag',
description='',
schedule_interval='@daily',
template_searchpath=tmpl_search_path,
start_date=datetime(2017, 3, 20),
catchup=False)
# Returns a string
t1 = PythonOperator(task_id='t1',
python_callable= someCallable,
dag=dag)
update_monitoring_table_last_run = PostgresOperator(task_id='update_monitoring_table_last_run',
sql='update_monitoring_table_last_run.sql',
postgres_conn_id='conn_id',
params={"script_name": t1.task_id},
dag=dag)
update_monitoring_table_status_msg = PostgresOperator(task_id='update_monitoring_table_status_msg',
sql='update_monitoring_table_status_msg.sql',
postgres_conn_id='conn_id',
params={"script_name": str(t1.task_id),
"status_msg": "{{ ti.xcom_pull(key='return_value') }}"},
dag=dag)
t1 >> update_monitoring_table_last_run >> update_monitoring_table_status_msg
PostgresOperator для update_monitoring_table_last_run
работает совершенно нормально, так что не беспокойтесь. Как update_monitoring_table_status_msg
не работает, как задумано. Вот базовый шаблон SQL:
UPDATE table
SET status_msg = '{{ params.status_msg }}'
WHERE script_name = '{{ params.script_name }}'
Визуализированный шаблон выглядит так:
UPDATE bi.bi_scripts_monitoring
SET status_msg = '{{ ti.xcom_pull(key='return_value') }}'
WHERE script_name = 'script_name'
Но я хочу, чтобы он выглядел так:
UPDATE bi.bi_scripts_monitoring
SET status_msg = 'StringThatWasReturnedByPythonOperator'
WHERE script_name = 'script_name'
Почему здесь не отображается значение? Я попробовал то же самое с BashOperator
, и там шаблон был правильно отображен? Любая помощь будет оценена. Я думаю, что мне не хватает чего-то, что XCom специфицирует c.