При извлечении данных из XCOM вы хотите указать идентификатор задачи, в которую вы sh добавили данные. В вашем примере, task_id вашей задачи pu sh равен push_task
, поэтому вы захотите сделать что-то вроде:
value = context['task_instance'].xcom_pull(task_ids='push_task')
Однако из документации по воздушному потоку обратите внимание, что:
По умолчанию xcom_pull () фильтрует ключи, которые автоматически передаются XComs, когда они возвращаются из функций выполнения (в отличие от XComs, которые нажимаются вручную).
Если вы отправляете данные в XCOM вручную с указанием c ключей, вам может потребоваться включить эту клавишу при вызове xcom_pull
. В вашем примере вы нажимаете sh ключ под названием filename
в своем задании pu sh, поэтому вам, вероятно, потребуется выполнить что-то подобное в задании на извлечение:
value = context['task_instance'].xcom_pull(task_ids='push_task', key='filename')
Эта информация более подробно изложено в документации Airflow: https://airflow.apache.org/docs/stable/concepts.html?highlight=xcom#concepts -xcom
Что касается вашего вопроса о «лучших методах» - для связи между Задачами / Операторами Airflow, XCOM - лучший способ до go. Однако, если вы хотите прочитать файл с диска несколькими операторами, вам необходимо убедиться, что все ваши работники имеют доступ к тому, где хранится файл. Если это невозможно, альтернативой может быть удаленное хранение файла задач pu sh (например, в AWS S3 ) и pu sh URL-адреса S3 для XCOM. Задача извлечения может затем прочитать URL-адрес S3 из XCOM и загрузить файл из S3.