Нумерация записей с использованием MySqlToGoogleCloudStorageOperator - PullRequest
0 голосов
/ 11 октября 2018

Мой рабочий процесс:

  1. Я получаю максимальный идентификатор заказа, который у нас сейчас есть, из переменной LAST_IMPORTED_ORDER_ID
  2. Я получаю максимум order_id вMySQL база данных
  3. заказов на импорт между LAST_IMPORTED_ORDER_ID к значению в xcom из MySQL с использованием MySqlToGoogleCloudStorageOperator

Пока все хорошо, и это работает хорошо.

Однако проблема заключается в том, что разрыв между значением слишком велик.Это может быть 500K заказов.Невозможно импортировать столько записей одновременно.

У MySqlToGoogleCloudStorageOperator есть возможность разбить файл, сохраненный в хранилище, на куски с помощью approx_max_file_size_bytes, но у него нет возможности разбивать запрос на части.

В основном то, что я хочусделать это использовать что-то вроде подкачки для запроса.Если
xcom_order_id - LAST_IMPORTED_ORDER_ID > 50K, то разбить запрос на строки длиной до 50 КБ, что означает, что мне нужно динамически создавать операторы.

Это то, что я пытался сделать:

LAST_IMPORTED_ORDER_ID = Variable.get("last_order_id_imported")

start_task_op = DummyOperator(task_id='start_task', dag=dag)

def chunck_import(**kwargs):
    ti = kwargs['ti']
    xcom = int(ti.xcom_pull(task_ids='get_max_order_id_2_months_ago'))
    current = int(LAST_IMPORTED_ORDER_ID)
    if xcom - current < 50000:
        num_pages = 1
    else:
        num_pages = int((xcom / current) + 1)
    logging.info(xcom)
    logging.info(current)
    for i in range(1, num_pages + 1):  #for 1 page its range(1,2)
        start = LAST_IMPORTED_ORDER_ID * i
        end = start + 50000
        if end > xcom:
            end = xcom
        import_orders_op = MySqlToGoogleCloudStorageOperator(
            task_id='import_orders_and_upload_to_storage_orders-{}'.format(i),
            mysql_conn_id='mysqlcon',
            google_cloud_storage_conn_id='googlecon',
            provide_context=True,
            approx_max_file_size_bytes=100000000,
            sql='select * from e.orders where orders_id between {{ params.start }} and {{ params.end }}',
            params={'start': start, 'end': end},
            bucket=GCS_BUCKET_ID,
            filename=file_name_orders,
            dag=dag)


chunck_import_op = PythonOperator(
    task_id='chunck_import',
    provide_context=True,
    python_callable=chunck_import,
    dag=dag)

start_task_op >> get_max_order_id_2_months_ago >> chunck_import_op

Это имеетошибок нет, и он работает успешно, но ничего не делает.

Значение в XCOM является правильным.но chunck_import_op ничего не делает.Также я не вижу динамически создаваемый MySqlToGoogleCloudStorageOperator в моем пользовательском интерфейсе:

enter image description here

Также обратите внимание на print num_pages Я также не вижузначение в журнале.

Как это исправить?

1 Ответ

0 голосов
/ 11 октября 2018

К сожалению для вас, оператор не может изменить входную группу DAG. Поскольку вы можете извлекать только xcom при выполнении оператора, я бы предложил вместо добавления оператора в DAG вместо вас в конце настройки циклаОператоры up, внутри цикла, вызывают:

import_orders_op.pre_execute(**kwargs)
import_orders_op.execute(**kwargs)

Это немного глупо, поскольку весь вывод журнала будет в задаче chunck_import, которую вы, возможно, захотите переименовать логически для себя (import_in_chunks?), но он должен работать, и ваша группа доступности базы данных не изменит точное количество задач за один прогон.установить пару ShortCircuitOperator и MySqlToGoogleCloudStorageOperator для каждого диапазона на основе чанка.ShortCircuitOperator должен проверить, допустим ли начальный диапазон блока, и запустить опцию sql 2 gcs, если она есть, или короткое замыкание, если это не так.

Лучшим подходом было бы создание подкласса MySqlToGoogleCloudStorageOperator в PagedMySqlToGCSOperator, переезд execute, _query_mysql и _write_local_data_files.Хотя это больше работы.

...