Я пытаюсь получить простой Airflow (v1.9.0) user_defined_filters
, работающий на основе Документов .Вот минимальный пример, который я пытаюсь запустить
def userfilter(id):
return 'Hello' + id
default_args = {
'start_date': datetime(2018, 5, 19),
'user_defined_filters': dict(hello=lambda name: 'Hello%s' % name, filter2=userfilter),
}
dag = DAG('mwe', default_args=default_args)
jinja_env = dag.get_template_env()
print(jinja_env.filters)
t1 = SimpleHttpOperator(
task_id='helloworld',
# endpoint="/api/{{ 'world' | filter2 }}",
endpoint="/api/{{ 'world' | hello }}",
method='GET',
headers=None,
response_check=None,
extra_options=None,
xcom_push=True,
http_conn_id='myconn',
dag=dag)
В конечном итоге я хочу, чтобы userfilter
вызывался со значениями, которые я извлек из xcom
.Но даже в этом простом примере я получаю исключение:
{base_task_runner.py:98} INFO - Subtask File "<unknown>", line 1, in template
{base_task_runner.py:98} INFO - Subtask: jinja2.exceptions.TemplateAssertionError: no filter named 'hello'`
print(jinja_env.filters)
также не показывает определенные пользовательские фильтры.Что мне здесь не хватает?