Как динамически создавать операторы с разными параметрами - PullRequest
0 голосов
/ 11 октября 2018

У меня есть следующий код:

def chunck_import(**kwargs):
    ...
    logging.info('Number of pages required is: {0}'.format(num_pages))
    for i in range(1, num_pages + 1):
        ...
        parameter_where = 'where orders_id between {0} and {1}'.format(start,end)
        logging.info(parameter_where)

chunck_import_op = PythonOperator(
    task_id='chunck_import',
    provide_context=True,
    python_callable=chunck_import,
    dag=dag)


start_task_op >> ... >>  chunck_import_op

Этот оператор создает несколько WHERE операторов:

INFO - From 7557920 to 7793493
INFO - Number of pages required is: 4
where orders_id between 7607920 and 7657920
where orders_id between 7657921 and 7707920
where orders_id between 7707921 and 7757920
where orders_id between 7757921 and 7793493

Теперь у меня есть MySqlToGoogleCloudStorageOperator следующим образом:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    ...
    sql = 'select * from orders {{ params.where_cluster }}',
    params={'where_cluster': parameter_where},
    dag=dag) 

chunck_import_op знает, сколько раз мне нужно вызвать MySqlToGoogleCloudStorageOperator - num_pages Также создается строка, которую мне нужно передать в качестве параметра- parameter_where

MyВопрос в том, как динамически создать MySqlToGoogleCloudStorageOperator в соответствии с num_pages и передать ему parameter_where.

Ответы [ 2 ]

0 голосов
/ 12 октября 2018

Я бы создал подкласс MySqlToGoogleCloudStorageOperator, чтобы настроить запрос и переопределить шаги выполнения, чтобы создать постраничный запрос в соответствии с параметром размера страницы, передаваемым оператору.Это некоторая дополнительная работа, но она рекомендуется для других вариантов здесь.

Однако вы не можете иметь PythonOperator или какого-либо оператора, изменить группу DAG (и распознать ее и запланировать).Максимум, что он может сделать, это одно из:

  1. После построения предложения where создайте MySqlToGoogleCloudStorageOperator с предложением и вызывайте execute для него прямо внутри PythonOperator.Это сработает, и вы увидите сообщения журнала из MySqlToGoogleCloudStorageOperator прямо в журналах PythonOperator.
  2. Используйте PythonOperator или TriggerDagRunOperator, чтобы вызвать другую группу обеспечения доступности баз данных с помощью только MySqlToGoogleCloudStorageOperatorпередавая предложение в качестве параметра или передавая его в XCOM для этой группы DAG.Этот другой DAG, вероятно, должен иметь Расписание, установленное на @NoneЭто усложнит отслеживание журналов, но может привести к параллельной работе групп DAG.

Если бы это был мой DAG, я думаю, что вместо этого мой подход (если не подклассификация) заключался бы в том, чтобы всегда обрабатыватьот 1 до X страниц.Предположим, что ваша группа доступности баз данных должна обрабатывать максимум X страниц результатов, где, например, X равно 10.Затем определите 10 ветвей от родителя chunck_import_op.Вам не понадобится chunck_import_op или вызываемый объект.

  • Каждая ветвь будет начинаться с ShortCircuitOperator, который вызывает один и тот же вызываемый объект с различными offset аргументами (от 0 до 9).Этот вызываемый будет проверять, больше ли offset * page_size, чем end, если да, то возвращает False, пропуская свои нисходящие операторы.В противном случае он отправит в xcom действительный запрос с диапазоном, основанным на смещении, и вернет True для их запуска.
  • Каждая ветвь продолжается с MySqlToGoogleCloudStorageOperator, для которого запрос установлен как {{ ti.xcom_pull('<ShortCircuitOperator_N>') }}, гдестрока - это имя предыдущего ShortCircuitOperator.
  • Если после MySqlToGoogleCloudStorageOperator s вам нужны другие операторы, сначала добавьте DummyOperator в качестве дочернего элемента для всех этих MySqlToGoogleCloudStorageOperator s и сделайтеtrigger_rule ALL_DONE, затем добавьте другие операторы в качестве дочерних для этого.

Таким образом, при необходимости вы можете запускать от 1 до 10 постраничных запросов.Хотя они могут работать параллельно, хотя я не думаю, что это потенциальная проблема, просто подумайте об этом.

0 голосов
/ 11 октября 2018

Воздушный поток обеспечивает механизм XComs для задач (операторов) для связи между собой.В вашем конкретном сценарии задача chunck_import может предварительно вычислить все операторы where и поместить их в XCom;тогда задача import_orders может вытащить XCom, прочитать все предложения where и использовать их по мере необходимости.

Если этот механизм не работает в логике вашего приложения, тогда, пожалуйста, измените свой вопрос и объясните, почему нет.

...