У меня есть следующий код:
def chunck_import(**kwargs):
...
logging.info('Number of pages required is: {0}'.format(num_pages))
for i in range(1, num_pages + 1):
...
parameter_where = 'where orders_id between {0} and {1}'.format(start,end)
logging.info(parameter_where)
chunck_import_op = PythonOperator(
task_id='chunck_import',
provide_context=True,
python_callable=chunck_import,
dag=dag)
start_task_op >> ... >> chunck_import_op
Этот оператор создает несколько WHERE
операторов:
INFO - From 7557920 to 7793493
INFO - Number of pages required is: 4
where orders_id between 7607920 and 7657920
where orders_id between 7657921 and 7707920
where orders_id between 7707921 and 7757920
where orders_id between 7757921 and 7793493
Теперь у меня есть MySqlToGoogleCloudStorageOperator
следующим образом:
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
...
sql = 'select * from orders {{ params.where_cluster }}',
params={'where_cluster': parameter_where},
dag=dag)
chunck_import_op
знает, сколько раз мне нужно вызвать MySqlToGoogleCloudStorageOperator
- num_pages
Также создается строка, которую мне нужно передать в качестве параметра- parameter_where
MyВопрос в том, как динамически создать MySqlToGoogleCloudStorageOperator
в соответствии с num_pages
и передать ему parameter_where
.