Воздушный поток: при повторном запуске DAG не удается загрузить XCOM с предыдущего запуска - PullRequest
0 голосов
/ 03 мая 2020

Есть ли способ сохранить значение XCOM во время повторного запуска шага DAG (после очистки состояния)?

Ниже приведена упрощенная версия того, что я пытаюсь выполнить sh, а именно, когда состояние шага группы доступности базы данных очищается и шаг запускается повторно, я хотел бы иметь возможность загрузить значение XCOM, выдавленное при предыдущем запуске. Однако, хотя я могу видеть значение в интерфейсе XCOM, значение не извлекается. Я просмотрел исходный код метода pull_xcom(), но не могу понять, где он отфильтровывается.

Функциональность, которой я пытаюсь достичь, заключается в поддержании некоторого количества состояния между сбоями. прогоны DAG. В этом примере это будет означать, что 1 добавляется к сохраненному значению каждый раз, когда шаг DAG очищается и запускается повторно.

from datetime import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def test_step(**kwargs):
    ti = kwargs.get('task_instance')
    value = ti.xcom_pull(key='key', include_prior_dates=True)

    if value is None:
        value = 0
    print(f'BEFORE VALUE: {value}')
    value += 1
    print(f'AFTER VALUE: {value}')

    ti.xcom_push(key='key', value=value)

    # Simulating a failure
    raise Exception


default_args = {
    'owner': 'Testing',
    'depends_on_past': False,
    'email': ['test@test.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

dag = DAG(
    'test_dag',
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2020, 4, 9),
)

t1 = PythonOperator(
    task_id='test_step',
    provide_context=True,
    python_callable=test_step,
    dag=dag,
)

t1

1 Ответ

0 голосов
/ 03 мая 2020

Каждый раз, когда задача собирается выполняться, ее XCom очищается на текущую дату выполнения (https://github.com/apache/airflow/blob/1.10.10/airflow/models/taskinstance.py#L960). Вот почему вы никогда не получите значения из предыдущих попыток выполнения задачи. Использование include_prior_dates=True извлекает данные только из предыдущих дат выполнения, но не из предыдущих запусков с той же датой выполнения.

Одно из возможных решений - поместить задачу DummyOperator выше вашей задачи test_step, называемую скажем xcom_store.test_step. Затем используйте airflow.models.XCom.set () непосредственно в test_step для ваших значений XCom в задаче xcom_store.test_step (ссылка xcom_pu sh () в качестве примера) , Когда вам нужно вытащить, просто потяните, как обычно, но вместо этого из фиктивной задачи, т.е. ti.xcom_pull(task_ids='xcom_store.test_step', key='key'). Определенно не идеальный и может привести к некоторой путанице, но если вы стандартизируете это и создадите вокруг него помощников, это может быть хорошо?

...