Получение правильного user_defined_filter в Airflow - PullRequest
0 голосов
/ 25 мая 2018

Я пытаюсь получить простой Airflow (v1.9.0) user_defined_filters, работающий на основе Документов .Вот минимальный пример, который я пытаюсь запустить

def userfilter(id):
    return 'Hello' + id

default_args = {
        'start_date': datetime(2018, 5, 19),
        'user_defined_filters': dict(hello=lambda name: 'Hello%s' % name, filter2=userfilter),
        }

dag = DAG('mwe', default_args=default_args)
jinja_env = dag.get_template_env()
print(jinja_env.filters)

t1 = SimpleHttpOperator(
    task_id='helloworld',
#    endpoint="/api/{{ 'world' | filter2 }}",
    endpoint="/api/{{ 'world' | hello }}",
    method='GET',
    headers=None,
    response_check=None, 
    extra_options=None, 
    xcom_push=True, 
    http_conn_id='myconn',
    dag=dag)

В конечном итоге я хочу, чтобы userfilter вызывался со значениями, которые я извлек из xcom.Но даже в этом простом примере я получаю исключение:

{base_task_runner.py:98} INFO - Subtask File "<unknown>", line 1, in template
{base_task_runner.py:98} INFO - Subtask: jinja2.exceptions.TemplateAssertionError: no filter named 'hello'`

print(jinja_env.filters) также не показывает определенные пользовательские фильтры.Что мне здесь не хватает?

1 Ответ

0 голосов
/ 25 мая 2018

Параметр, который вы пытаетесь использовать, является не частью default_args, а параметром класса DAG:

dag = DAG('mwe', 
          default_args=default_args),
          user_defined_filters=dict(hello=lambda name: 'Hello%s' % name, filter2=userfilter)
          )

См. https://airflow.apache.org/_modules/airflow/models.html

...