Воздушный поток: доступ к ti.xcom_pull () из шаблонного SQL в PostgresOperator - PullRequest
0 голосов
/ 25 апреля 2019

У меня есть задача, созданная из PostgresOperator следующим образом:

sql = "select * from {{ ti.xcom_pull(key='foo') }};"
task = PostgresOperator(sql=sql)

dag не загружается, говоря "jinja2.exceptions.UndefinedError: 'ti' не определено.

Я получаю ту же ошибку, когда заменяю 'ti' на task_instance. Любая помощь будет высоко ценится!

Странно то, что если я заменим ti.xcom_pull(...) на ti, он будет отображаться без жалоб, возвращая <TaskInstance: foo.bar 2019-04-25T14:27:06.822835+00:00 [None]>

1 Ответ

1 голос
/ 25 апреля 2019

Я проверил: {{task_instance.xcom_pull(task_ids='taskidwherexcomisset', key='foo').encode('utf-8')}}, но я уверен, что {{ ti.xcom_pull(key='foo') }} тоже будет работать.

try:

sql = 'select * from ' + "{{ti.xcom_pull(key='foo').encode('utf-8')}}" + ';',

пример строки журнала, чтобы показать, что она работает:

[2019-04-25 15:39:24,715] {base_task_runner.py:98} INFO - Subtask: [2019-04-25 15:39:24,714] {postgres_operator.py:52} INFO - Executing: select * from 2019/04/24/;
...