Использование подстановки строки Python и xcom_pull в операторе HttpSensor Airflow - PullRequest
0 голосов
/ 21 мая 2019

У меня есть сценарий использования, где я внутри цикла for, и мне нужно динамически заполнить поля в задаче HttpSensor

Я попытался использовать этот синтаксис:

Метод 1 СБОЙ:

s = 'sensor_task_sd_{0}'.format(d)
            sensor_task_sd = HttpSensor(
                task_id=s,
                http_conn_id='ss_api',
                endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids={})}}/status?rev=0".format(t),
                request_params={'X-Requested-By': 'abc_123'},
                response_check=lambda response: True if "FINISHED" in response.text else False,
                poke_interval=10,
                soft_fail=True,
                timeout=600,
                dag=dag_subdag,
                )

но это не получается, потому что в этой строке:

endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids={})}}/status?rev=0".format(t)

Я не могу заставить подстановку строки питона с помощью .format (t) работать.

Вместо этого, если я жестко закодирую какой-то кодзначение приведенного выше кода работает ... например, приведенный ниже код работает нормально:

Метод 2 УСПЕХ:

s = 'sensor_task_sd_{0}'.format(d)
sensor_task_sd = HttpSensor(
               task_id=s,
               http_conn_id='ss_api',
        endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids='start_pipeline_sd_campaignhistory')}}/status?rev=0",
        request_params={'X-Requested-By': 'abc_123'},
               response_check=lambda response: True if "FINISHED" in response.text else False,
               poke_interval=10,
               soft_fail=True,
               timeout=600,
               dag=dag_subdag)

Я просто не понимаю этот бит ..... Я пробовал каждыйкомбинация трюков, чтобы заставить его работать, он просто не берет интерполяции строк, которые я использую для поддержания динамического кода.

, поэтому мой вопрос очень прост:

Как мне сделать оператор HttpSensorдинамический?Я не хочу жестко кодировать значения моей функции в строке конечной точки (стиль метода 2), я хотел бы использовать значения, установленные во время выполнения (стиль метода 1).

Как я могу заставить метод 1 работать

1 Ответ

1 голос
/ 21 мая 2019

Таким образом, Airflow использует Jinja для шаблонирования своих строк, и когда вы смешиваете шаблоны Jinja и форматирование Python, вам нужно «убрать» фигурные скобки, которые нужны Jinja, чтобы форматирование Python не использовало их.Вы делаете это, удваивая каждую фигурную скобку, которая не предназначена для вызова .format().

Это должно дать вам необходимые результаты.

endpoint="/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={}) }}}}/status?rev=0".format(t)

Кстати, по моему опыту, я могу использовать f-строки (Python 3.6+) или именованные параметры форматирования, если вы действительно можете помочь ясности кода при смешивании двух в воздушном потокескрипт.Но вам все равно нужно «экранировать» фигурные скобки.

f-строки:

endpoint=f"/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={t})}} }}/status?rev=0"

Параметры именованного формата:

endpoint="/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={task_id}) }}}}/status?rev=0".format(task_id=t)

Надеюсь, что поможет:)

...