Как вы заметили, MySqlToGoogleCloudStorageOperator задает template_ext
с расширением .sql.
Сначала в вашем Dag
укажите путь, куда вы положили свой файл .sql
dag = DAG('my_dag', default_args=default_args, schedule_interval="30 7 * * *", template_searchpath = ['/home/ubuntu/airflow/.../myfolder'])
В yourfile.sql поместите свой большой запрос.Обратите внимание на params.ord_id
SELECT * FROM orders where orderid> {{ params.ord_id }}
Теперь в аргументе оператора sql
передайте имя файла.
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
mysql_conn_id='con1',
google_cloud_storage_conn_id='con2',
provide_context=True,
sql='yourfile.sql',
params={"ord_id":99},
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
Важно, чтобы после этого имени файла не было пробела.Это потому, что шаблонизатор Jinja будет искать эту строку, заканчивающуюся .sql
, и если он это сделает, он будет обрабатывать ее как файл, а не как строку.