Шаблон воздушного потока не отображается при использовании с PostgresOperator при попытке передать значение из XCom - PullRequest
0 голосов
/ 09 марта 2020

Я написал довольно простую группу обеспечения доступности баз данных, которая сначала выполняет некоторые операции python, а затем должна обновить таблицу базы данных с возвращаемым значением. Вот DAG-код:

tmpl_search_path = Variable.get('sql_path')

dag = DAG('test_dag',
          description='',
          schedule_interval='@daily',
          template_searchpath=tmpl_search_path,
          start_date=datetime(2017, 3, 20),
          catchup=False)

# Returns a string
t1 = PythonOperator(task_id='t1',
                    python_callable= someCallable,
                    dag=dag)


update_monitoring_table_last_run = PostgresOperator(task_id='update_monitoring_table_last_run',
                                                    sql='update_monitoring_table_last_run.sql',
                                                    postgres_conn_id='conn_id',
                                                    params={"script_name": t1.task_id},
                                                    dag=dag)

update_monitoring_table_status_msg = PostgresOperator(task_id='update_monitoring_table_status_msg',
                                                      sql='update_monitoring_table_status_msg.sql',
                                                      postgres_conn_id='conn_id',
                                                      params={"script_name": str(t1.task_id),
                                                              "status_msg": "{{ ti.xcom_pull(key='return_value') }}"},
                                                      dag=dag)

t1 >> update_monitoring_table_last_run >> update_monitoring_table_status_msg

PostgresOperator для update_monitoring_table_last_run работает совершенно нормально, так что не беспокойтесь. Как update_monitoring_table_status_msg не работает, как задумано. Вот базовый шаблон SQL:

UPDATE table
SET status_msg = '{{ params.status_msg }}'
WHERE script_name = '{{ params.script_name }}'

Визуализированный шаблон выглядит так:

UPDATE bi.bi_scripts_monitoring
SET status_msg = '{{ ti.xcom_pull(key='return_value') }}'
WHERE script_name = 'script_name'

Но я хочу, чтобы он выглядел так:

UPDATE bi.bi_scripts_monitoring
SET status_msg = 'StringThatWasReturnedByPythonOperator'
WHERE script_name = 'script_name'

Почему здесь не отображается значение? Я попробовал то же самое с BashOperator, и там шаблон был правильно отображен? Любая помощь будет оценена. Я думаю, что мне не хватает чего-то, что XCom специфицирует c.

1 Ответ

1 голос
/ 09 марта 2020

params Ключевое слово не является шаблоном, поэтому вам нужно изменить ваш файл следующим образом:

UPDATE table
SET status_msg = '{{ params.status_msg }}'
WHERE script_name = '{{ ti.xcom_pull(key="return_value") }}'
...