Как передать SQL как файл с параметрами в Airflow Operator - PullRequest
0 голосов
/ 07 октября 2018

У меня есть оператор в потоке воздуха:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql="""SELECT * FROM orders where orderid>{0}""".format(parameter),
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

Теперь фактический запрос, который мне нужно выполнить, имеет длину 24 строки.Я хочу сохранить его в файл и указать оператору путь к файлу SQL.Оператор поддерживает это, но я не уверен, что делать с параметром, для которого нужен SQL.

Предложения?

РЕДАКТИРОВАТЬ: Это мой код:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    templates_dict={'sql': '/home/ubuntu/airflow/.../orders_op.sql'},
    sql = '{{ templates_dict.sql }}',
    params={'last_imported_id': LAST_IMPORTED_ORDER_ID, 'table_name' :  TABLE_NAME},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

Это дает:

jinja2.exceptions.UndefinedError: 'templates_dict' не определено

1 Ответ

0 голосов
/ 07 октября 2018

Как вы заметили, MySqlToGoogleCloudStorageOperator задает template_ext с расширением .sql.

Сначала в вашем Dag укажите путь, куда вы положили свой файл .sql

dag = DAG('my_dag', default_args=default_args, schedule_interval="30 7 * * *", template_searchpath = ['/home/ubuntu/airflow/.../myfolder'])

В yourfile.sql поместите свой большой запрос.Обратите внимание на params.ord_id

SELECT * FROM orders where orderid> {{ params.ord_id }}

Теперь в аргументе оператора sql передайте имя файла.

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql='yourfile.sql',
    params={"ord_id":99},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

Важно, чтобы после этого имени файла не было пробела.Это потому, что шаблонизатор Jinja будет искать эту строку, заканчивающуюся .sql, и если он это сделает, он будет обрабатывать ее как файл, а не как строку.

...