Воздушный поток принимает шаблон дзиндзя как строку - PullRequest
0 голосов
/ 26 ноября 2018

в Airflow я пытаюсь использовать шаблон jinja в airflow, но проблема в том, что он не анализируется, а рассматривается как строка.Пожалуйста, смотрите мой код ``

from datetime import datetime

from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG

def test_method(dag,network_id,schema_name):
    print "Schema_name in test_method", schema_name
    third_task = PythonOperator(
        task_id='first_task_' + network_id,
        provide_context=True,
        python_callable=print_context2,
        dag=dag)
    return third_task

dag = DAG('testing_xcoms_pull', description='Testing Xcoms',
          schedule_interval='0 12 * * *',
          start_date= datetime.today(),
          catchup=False)


def print_context(ds, **kwargs):
    return 'Returning from print_context'

def print_context2(ds, **kwargs):
    return 'Returning from print_context2'

def get_schema(ds, **kwargs):
    # Returning schema name based on network_id
    schema_name = "my_schema"
    return get_schema

first_task = PythonOperator(
    task_id='first_task',
    provide_context=True,
    python_callable=print_context,
    dag=dag)


second_task = PythonOperator(
    task_id='second_task',
    provide_context=True,
    python_callable=get_schema,
    dag=dag)

network_id = '{{ dag_run.conf["network_id"]}}'

first_task >> second_task >> test_method(
                    dag=dag,
                    network_id=network_id,
                    schema_name='{{ ti.xcom_pull("second_task")}}')

``

Создание Dag не выполняется, потому что '{{ dag_run.conf["network_id"]}}' принимается за строку как поток воздуха.Может кто-нибудь помочь мне с проблемой в моем коде ???

Ответы [ 2 ]

0 голосов
/ 27 ноября 2018

Объект DAG и его код определения не анализируются в контексте выполнения, он анализируется в отношении среды, доступной ему при загрузке Python.

Переменная network_id, который вы используете для определения task_id в вашей функции, не шаблонизируется перед выполнением, это не может быть, так как выполнение не активно.Даже при использовании шаблонов вам по-прежнему необходимо действительное статическое не шаблонное значение task_id для создания экземпляра объекта DAG.

0 голосов
/ 26 ноября 2018

Операторы воздушного потока имеют переменную с именем template_fields.Эта переменная обычно объявляется в верхней части класса операторов, посмотрите любой из операторов в кодовой базе github.

Если поле, в которое вы пытаетесь передать синтаксис шаблона Jinja, отсутствует в списке template_fields, синтаксис jinja отобразится в виде строки.

...