Как извлечь значение xcom из другого экземпляра задачи в том же прогоне DAG (не самый последний)? - PullRequest
0 голосов
/ 13 февраля 2019

У меня есть 3 прогона DAG:

  1. DAGR 1 выполнено в 2019-02-13 16: 00: 00
  2. DAGR 2 выполнено в 2019-02-13 17:00: 00
  3. DAGR 3 выполнено в 2019-02-13 18: 00: 00

В экземпляре задачи X из DAGR 1 Я хочу получить значение xcom задачиэкземпляр Y.Я сделал это:

kwargs['task_instance'].xcom_pull(task_ids='Y')

Я ожидал получить значение xcom из экземпляра задачи Y в DAGR 1.Вместо этого я получил от DAGR 3.

Из документации Airflow

Если xcom_pull передается одна строка для task_ids, то самое последнее значение XCom из этой задачивернулся;...

  1. Почему Airflow xcom_pull возвращает самое последнее значение xcom?
  2. Что если я хочу извлечь из того же прогона DAG?

Ответы [ 2 ]

0 голосов
/ 13 февраля 2019
  • Я думаю, вы ищете include_prior_dates параметр из xcom_pull() метод
  • Обратите внимание, что он вернет всю историю Xcom с (python list, каждый элемент представляет собой одну xcom запись ), выдаваемую данным task (отфильтрованным по task_id (s)), а затем вам придется вручную отфильтровать желаемое xcom, используя execution_date поле
  • Может быть трудно указать точное execution_date для фильтрации;для этого посмотрите, как они реализовали execution_delta и execution_date_fn params в ExternalTaskSensor
0 голосов
/ 13 февраля 2019

Это отвечает на ваш вопрос [Как извлечь значение xcom из другого экземпляра задачи в том же прогоне DAG (не самого последнего)?]
См. пример ниже:

t1 = SomeOperator(
        task_id='Your_t1_Task_ID',
        ...
        ...
        dag=dag)

    def get_records(**kwargs):
        ti = kwargs['ti']
        xcom = ti.xcom_pull(task_ids='Your_t1_Task_ID')
        string_to_print = 'Value in xcom is: {}'.format(xcom)
        #string_to_print holds that value, you can also print it in the logs
        logging.info(string_to_print)

    t2 = PythonOperator(
        task_id='records',
        provide_context=True,
        python_callable=get_records,
        dag=dag)

    t1 >> t2
...