В вашем файле dag вы не находитесь в контексте dagrun с существующим экземпляром задачи, который вы можете использовать, как у вас.
Вы можете извлекать значение только тогда, когда оператор работает, а не когда выего настройка (последний контекст выполняется в цикле планировщиком и будет выполняться 1000 раз в день, даже если группа доступности баз данных еженедельно или была отключена).Но то, что вы написали, на самом деле очень близко к тому, что сработало бы, так что, возможно, вы уже рассмотрели этот контекстный момент.
Давайте напишем это как шаблон:
# YOUR EXAMPLE FORMATTED A BIT MORE 80 COLS SYTLE
…
sql='SELECT * FROM orders where orders_id>{0} and orders_id<{1}'.format(
LAST_IMPORTED_ORDER_ID,
{{ task_instance.xcom_pull(
task_ids=['get_max_order_id'], key='result_status') }}),
…
# SHOULD HAVE BEEN AT LEAST: I hope you can spot the difference.
…
sql='SELECT * FROM orders where orders_id>{0} and orders_id<{1}'.format(
LAST_IMPORTED_ORDER_ID,
"{{ task_instance.xcom_pull("
"task_ids=['get_max_order_id'], key='result_status') }}"),
…
# AND COULD HAVE BEEN MORE CLEARLY READABLE AS:
…
sql='''
SELECT *
FROM orders
WHERE orders_id > {{ params.last_imported_id }}
AND orders_id < {{ ti.xcom_pull('get_max_order_id') }}
''',
params={'last_imported_id': LAST_IMPORTED_ORDER_ID},
…
И я знаю, чтовы заполняете LAST_IMPORTED_ORDER_ID
из переменной Airflow.Вы не могли бы сделать это в файле dag и вместо этого заменить {{ params.last_imported_id }}
на {{ var.value.last_imported_order_id }}
или на то, что вы назвали переменной Airflow, которую вы устанавливали.