Как отобразить значения из Xcom с MySqlToGoogleCloudStorageOperator - PullRequest
0 голосов
/ 07 октября 2018

У меня есть следующий код:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='mysql_con',
    google_cloud_storage_conn_id='gcp_con',
    sql='SELECT * FROM orders where orders_id>{0};'.format(LAST_IMPORTED_ORDER_ID),
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

Я хочу изменить запрос на:

sql='SELECT * FROM orders where orders_id>{0} and orders_id<{1};'.format(LAST_IMPORTED_ORDER_ID, ...)

Значение для {1} создается с оператором в задаче перед этимодин.На него нажимают XCOM.

Как я могу прочитать значение здесь?Это должно быть что-то с xcom_pull, но как правильно это сделать?Могу ли я сделать этот параметр sql внутри оператора?

Я пытался сделать это:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='mysql_con',
    google_cloud_storage_conn_id='gcp_con',
    sql='SELECT * FROM orders where orders_id>{0} and orders_id<{1}'.format(LAST_IMPORTED_ORDER_ID,{{ task_instance.xcom_pull(task_ids=['get_max_order_id'], key='result_status') }}),
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

Это дает:

Broken DAG: имя 'task_instance'не определено

1 Ответ

0 голосов
/ 07 октября 2018

В вашем файле dag вы не находитесь в контексте dagrun с существующим экземпляром задачи, который вы можете использовать, как у вас.

Вы можете извлекать значение только тогда, когда оператор работает, а не когда выего настройка (последний контекст выполняется в цикле планировщиком и будет выполняться 1000 раз в день, даже если группа доступности баз данных еженедельно или была отключена).Но то, что вы написали, на самом деле очень близко к тому, что сработало бы, так что, возможно, вы уже рассмотрели этот контекстный момент.

Давайте напишем это как шаблон:

# YOUR EXAMPLE FORMATTED A BIT MORE 80 COLS SYTLE
…
sql='SELECT * FROM orders where orders_id>{0} and orders_id<{1}'.format(
    LAST_IMPORTED_ORDER_ID,
    {{ task_instance.xcom_pull(
        task_ids=['get_max_order_id'], key='result_status') }}),
…

# SHOULD HAVE BEEN AT LEAST: I hope you can spot the difference.
…
sql='SELECT * FROM orders where orders_id>{0} and orders_id<{1}'.format(
    LAST_IMPORTED_ORDER_ID,
    "{{ task_instance.xcom_pull("
    "task_ids=['get_max_order_id'], key='result_status') }}"),
…

# AND COULD HAVE BEEN MORE CLEARLY READABLE AS:
…
sql='''
SELECT *
FROM orders
WHERE orders_id > {{ params.last_imported_id }}
  AND orders_id < {{ ti.xcom_pull('get_max_order_id') }}
''',
params={'last_imported_id': LAST_IMPORTED_ORDER_ID},
…

И я знаю, чтовы заполняете LAST_IMPORTED_ORDER_ID из переменной Airflow.Вы не могли бы сделать это в файле dag и вместо этого заменить {{ params.last_imported_id }} на {{ var.value.last_imported_order_id }} или на то, что вы назвали переменной Airflow, которую вы устанавливали.

...