Возможно, вы захотите проверить XCOM Airflow: https://airflow.apache.org/concepts.html#xcoms
Если вы возвращаете значение из функции, это значение сохраняется в xcom. В вашем случае вы можете получить к нему доступ из другого кода Python:
task_instance = kwargs['task_instance']
task_instance.xcom_pull(task_ids='Task1')
или в шаблоне примерно так:
{{ task_instance.xcom_pull(task_ids='Task1') }}
Если вы хотите указать ключ, который вы можете нажать в XCOM (находясь внутри задачи):
task_instance = kwargs['task_instance']
task_instance.xcom_push(key='the_key', value=my_str)
Затем вы можете получить к нему доступ так:
task_instance.xcom_pull(task_ids='my_task', key='the_key')
РЕДАКТИРОВАТЬ 1
Дополнительный вопрос: Вместо использования значения в другой функции, как я могу передать значение другому PythonOperator, например - "t2 =" BashOperator (task_id = 'Moving_bucket', bash_command = 'python / home / raw .py "% s" '% file_name, dag = dag) "--- я хочу получить доступ к file_name, которое возвращается" Task1 ". Как это может быть достигнуто?
Прежде всего, мне кажется, что на самом деле значение не передается другому PythonOperator
, а BashOperator
.
Во-вторых, это уже описано в моем ответе выше. Поле bash_command
является шаблонным (см. template_fields
в источнике: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/bash_operator.py). Следовательно, мы можем использовать шаблонную версию:
BashOperator(
task_id='Moving_bucket',
bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ',
dag=dag,
)
РЕДАКТИРОВАТЬ 2
Пояснение:
Airflow работает следующим образом: он выполнит Task1, затем заполнит xcom, а затем выполнит следующую задачу. Итак, чтобы ваш пример работал, сначала нужно выполнить Task1, а затем выполнить Moving_bucket после Task1.
Поскольку вы используете функцию возврата, вы также можете опустить key='file'
из xcom_pull
и не устанавливать ее вручную в функции.