Ссылка на имя файла через xcom в Airflow - PullRequest
1 голос
/ 19 января 2020

Я пытаюсь понять, как передавать значения с помощью функции airflow xcom. Конкретный c сценарий использования, который я пытаюсь создать, состоит в том, чтобы написать файл, затем переместить его, а затем выполнить другую команду. Идея состоит в том, что я передаю имя файла от одного оператора другому.

Вот что у меня есть:

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import datetime as dt

DAG = DAG(
  dag_id='xcom_test_dag',
  start_date=dt.datetime.now(),
  schedule_interval='@once'
)

def push_function(**context):
    file_name = 'test_file_{date}'.format(date=dt.datetime.now())
    return context['task_instance'].xcom_push(key='filename', value=file_name)

def pull_function(**context):
    dir(context['task_instance'].xcom_pull())

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

pull_task = PythonOperator(
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task

Если я хочу сослаться на имя файла в pull_task, чтобы я мог выполнить чтение файла - как мне это вызвать? Попытка доступа к context['task_instance'] не содержит значения. Далее - лучше ли пытаться ссылаться на имя файла, подобное этому, от задачи к задаче / от оператора к оператору?

1 Ответ

1 голос
/ 20 января 2020

При извлечении данных из XCOM вы хотите указать идентификатор задачи, в которую вы sh добавили данные. В вашем примере, task_id вашей задачи pu sh равен push_task, поэтому вы захотите сделать что-то вроде:

value = context['task_instance'].xcom_pull(task_ids='push_task')

Однако из документации по воздушному потоку обратите внимание, что:

По умолчанию xcom_pull () фильтрует ключи, которые автоматически передаются XComs, когда они возвращаются из функций выполнения (в отличие от XComs, которые нажимаются вручную).

Если вы отправляете данные в XCOM вручную с указанием c ключей, вам может потребоваться включить эту клавишу при вызове xcom_pull. В вашем примере вы нажимаете sh ключ под названием filename в своем задании pu sh, поэтому вам, вероятно, потребуется выполнить что-то подобное в задании на извлечение:

value = context['task_instance'].xcom_pull(task_ids='push_task', key='filename')

Эта информация более подробно изложено в документации Airflow: https://airflow.apache.org/docs/stable/concepts.html?highlight=xcom#concepts -xcom

Что касается вашего вопроса о «лучших методах» - для связи между Задачами / Операторами Airflow, XCOM - лучший способ до go. Однако, если вы хотите прочитать файл с диска несколькими операторами, вам необходимо убедиться, что все ваши работники имеют доступ к тому, где хранится файл. Если это невозможно, альтернативой может быть удаленное хранение файла задач pu sh (например, в AWS S3 ) и pu sh URL-адреса S3 для XCOM. Задача извлечения может затем прочитать URL-адрес S3 из XCOM и загрузить файл из S3.

...