Мой рабочий процесс:
- Я получаю максимальный идентификатор заказа, который у нас сейчас есть, из переменной
LAST_IMPORTED_ORDER_ID
- Я получаю максимум
order_id
вMySQL
база данных - заказов на импорт между
LAST_IMPORTED_ORDER_ID
к значению в xcom
из MySQL
с использованием MySqlToGoogleCloudStorageOperator
Пока все хорошо, и это работает хорошо.
Однако проблема заключается в том, что разрыв между значением слишком велик.Это может быть 500K
заказов.Невозможно импортировать столько записей одновременно.
У MySqlToGoogleCloudStorageOperator
есть возможность разбить файл, сохраненный в хранилище, на куски с помощью approx_max_file_size_bytes
, но у него нет возможности разбивать запрос на части.
В основном то, что я хочусделать это использовать что-то вроде подкачки для запроса.Если
xcom_order_id - LAST_IMPORTED_ORDER_ID > 50K
, то разбить запрос на строки длиной до 50 КБ, что означает, что мне нужно динамически создавать операторы.
Это то, что я пытался сделать:
LAST_IMPORTED_ORDER_ID = Variable.get("last_order_id_imported")
start_task_op = DummyOperator(task_id='start_task', dag=dag)
def chunck_import(**kwargs):
ti = kwargs['ti']
xcom = int(ti.xcom_pull(task_ids='get_max_order_id_2_months_ago'))
current = int(LAST_IMPORTED_ORDER_ID)
if xcom - current < 50000:
num_pages = 1
else:
num_pages = int((xcom / current) + 1)
logging.info(xcom)
logging.info(current)
for i in range(1, num_pages + 1): #for 1 page its range(1,2)
start = LAST_IMPORTED_ORDER_ID * i
end = start + 50000
if end > xcom:
end = xcom
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders_and_upload_to_storage_orders-{}'.format(i),
mysql_conn_id='mysqlcon',
google_cloud_storage_conn_id='googlecon',
provide_context=True,
approx_max_file_size_bytes=100000000,
sql='select * from e.orders where orders_id between {{ params.start }} and {{ params.end }}',
params={'start': start, 'end': end},
bucket=GCS_BUCKET_ID,
filename=file_name_orders,
dag=dag)
chunck_import_op = PythonOperator(
task_id='chunck_import',
provide_context=True,
python_callable=chunck_import,
dag=dag)
start_task_op >> get_max_order_id_2_months_ago >> chunck_import_op
Это имеетошибок нет, и он работает успешно, но ничего не делает.
Значение в XCOM
является правильным.но chunck_import_op
ничего не делает.Также я не вижу динамически создаваемый MySqlToGoogleCloudStorageOperator
в моем пользовательском интерфейсе:
Также обратите внимание на print num_pages
Я также не вижузначение в журнале.
Как это исправить?