Есть ли способ сохранить значение XCOM во время повторного запуска шага DAG (после очистки состояния)?
Ниже приведена упрощенная версия того, что я пытаюсь выполнить sh, а именно, когда состояние шага группы доступности базы данных очищается и шаг запускается повторно, я хотел бы иметь возможность загрузить значение XCOM, выдавленное при предыдущем запуске. Однако, хотя я могу видеть значение в интерфейсе XCOM, значение не извлекается. Я просмотрел исходный код метода pull_xcom()
, но не могу понять, где он отфильтровывается.
Функциональность, которой я пытаюсь достичь, заключается в поддержании некоторого количества состояния между сбоями. прогоны DAG. В этом примере это будет означать, что 1 добавляется к сохраненному значению каждый раз, когда шаг DAG очищается и запускается повторно.
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def test_step(**kwargs):
ti = kwargs.get('task_instance')
value = ti.xcom_pull(key='key', include_prior_dates=True)
if value is None:
value = 0
print(f'BEFORE VALUE: {value}')
value += 1
print(f'AFTER VALUE: {value}')
ti.xcom_push(key='key', value=value)
# Simulating a failure
raise Exception
default_args = {
'owner': 'Testing',
'depends_on_past': False,
'email': ['test@test.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
dag = DAG(
'test_dag',
default_args=default_args,
schedule_interval=None,
start_date=datetime(2020, 4, 9),
)
t1 = PythonOperator(
task_id='test_step',
provide_context=True,
python_callable=test_step,
dag=dag,
)
t1