У меня есть следующий рабочий процесс:
- Получить номер из MySqlOperator (динамический)
- Получить значение, хранящееся в переменной (статическая)
- Создать строку на основеоба.
- Используйте строку как команду sql для
MySqlToGoogleCloudStorageOperator
.
Теперь это оказалось трудным делом.
Это мой код:
VALUE_FROM_VARIABLE = Variable.get("my_var")
query = 'SELECT ... FROM orders where orders_id>{0}
and orderid<{1};'.format(VALUE_FROM_MySqlOperator, VALUE_FROM_VARIABLE)
file_name = ...
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders_and_upload_to_storage',
mysql_conn_id='mysql_con',
google_cloud_storage_conn_id='gcp_con',
sql=query,
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
Моя проблема в том, что я не могу получить доступ к MySqlOperator
XCOM, в котором хранится номер, необходимый для запроса.
Поэтому я попытался получить к нему доступ в PythonOperator
и построить строку запроса следующим образом:
def func(ds, **kwargs):
ti = kwargs['ti']
VALUE_FROM_MySqlOperator = str(ti.xcom_pull(task_ids='mySQL_task')) # get the XCOM of MySqlOperator
query = 'SELECT ... FROM orders where orders_id>{0}
and orderid<{1};'.format(VALUE_FROM_MySqlOperator, VALUE_FROM_VARIABLE)
return query
py_op = PythonOperator(
task_id='py_op_task',
provide_context=True,
python_callable=func,
xcom_push=True,
dag=dag)
Но теперь я не могу передать новый сгенерированный запрос в MySqlToGoogleCloudStorageOperator
, потому что не могу прочитатьXCOM внутри этого оператора.
Как мне выйти из этого?