Как вызвать оператор внутри функции Python, используя Airflow? - PullRequest
0 голосов
/ 21 октября 2018

У меня есть следующий код:

def chunck_import(**kwargs):
    ...
    for i in range(1, num_pages + 1):
        start = lower +  chunks * i
        end = start + chunks
        if i>1:
             start = start + 1
        logging.info(start, end)
        if end > max_current:
            end = max_current
        where = 'where orders_id between {0} and {1}'.format(start,end)
        logging.info(where)
        import_orders_products_op = MySqlToGoogleCloudStorageOperator(
            task_id='import_orders_and_upload_to_storage_orders_products_{}'.format(i),
            mysql_conn_id='mysql_con',
            google_cloud_storage_conn_id='gcp_con',
            provide_context=True,
            approx_max_file_size_bytes = 100000000, #100MB per file
            sql = 'import_orders.sql',
            params={'WHERE': where},
            bucket=GCS_BUCKET_ID,
            filename=file_name_orders_products,
            dag=dag)

    start_task_op = DummyOperator(task_id='start_task', dag=dag)

    chunck_import_op = PythonOperator(
        task_id='chunck_import',
        provide_context=True,
        python_callable=chunck_import,
        dag=dag)

    start_task_op >>  chunck_import_op

Этот код использует PythonOperator, чтобы вычислить, сколько прогонов мне нужно из MySqlToGoogleCloudStorageOperator и создать кластер WHERE SQL, тогда ему нужновыполнить его.

Проблема в том, что MySqlToGoogleCloudStorageOperator не выполняется.

Я не могу на самом деле

chunck_import_op >> import_orders_products_op

Как я могу сделать MySqlToGoogleCloudStorageOperator будет казнен внутри PythonOperator?

1 Ответ

0 голосов
/ 22 октября 2018

Я думаю, что в конце цикла for вы захотите вызвать import_orders_products_op.execute(context=kwargs), возможно, перед import_orders_products_op.pre_execute(context=kwargs).Это немного сложно, поскольку он пропускает вызов render_templates() для task_instance, и на самом деле, если вместо этого вы сделали task_instance для выполнения каждой из этих задач, вы могли бы вместо этого вызвать run или _raw_run_task, нооба они требуют информацию от dagrun (которую вы можете получить в контексте вызываемого Python, например, kwargs['dag_run'])

Смотря на то, что вы передали операторам, это выглядит как как выВам понадобится шаг шаблона, чтобы загрузить файл import_orders.sql и заполнить параметр WHERE.В качестве альтернативы, внутри самого вызываемого объекта можно загрузить файл в строку, заменить часть {{ params.WHERE }} (и любые другие) вручную без Jinja2 (или вы могли бы потратить время на выяснение правильных вызовов jinja2), а затем установить import_orders_products_op.sql=the_string_you_loaded перед звонком import_orders_products_op.pre_execute(context=kwargs) и import_orders_products_op.execute(context=kwargs).

...