У меня есть следующий код:
def chunck_import(**kwargs):
...
for i in range(1, num_pages + 1):
start = lower + chunks * i
end = start + chunks
if i>1:
start = start + 1
logging.info(start, end)
if end > max_current:
end = max_current
where = 'where orders_id between {0} and {1}'.format(start,end)
logging.info(where)
import_orders_products_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders_and_upload_to_storage_orders_products_{}'.format(i),
mysql_conn_id='mysql_con',
google_cloud_storage_conn_id='gcp_con',
provide_context=True,
approx_max_file_size_bytes = 100000000, #100MB per file
sql = 'import_orders.sql',
params={'WHERE': where},
bucket=GCS_BUCKET_ID,
filename=file_name_orders_products,
dag=dag)
start_task_op = DummyOperator(task_id='start_task', dag=dag)
chunck_import_op = PythonOperator(
task_id='chunck_import',
provide_context=True,
python_callable=chunck_import,
dag=dag)
start_task_op >> chunck_import_op
Этот код использует PythonOperator
, чтобы вычислить, сколько прогонов мне нужно из MySqlToGoogleCloudStorageOperator
и создать кластер WHERE
SQL, тогда ему нужновыполнить его.
Проблема в том, что MySqlToGoogleCloudStorageOperator
не выполняется.
Я не могу на самом деле
chunck_import_op >> import_orders_products_op
Как я могу сделать MySqlToGoogleCloudStorageOperator
будет казнен внутри PythonOperator
?