Python Airflow - возврат результата из PythonOperator - PullRequest
0 голосов
/ 03 мая 2018

Я написал DAG с несколькими PythonOperators

task1 = af_op.PythonOperator(task_id='Data_Extraction_Environment',
                          provide_context=True,
                          python_callable=Task1, dag=dag1)

def Task1(**kwargs):
    return(kwargs['dag_run'].conf.get('file'))

Из PythonOperator я вызываю метод «Task1». Этот метод возвращает значение, это значение, которое мне нужно передать следующему PythonOperator. Как я могу получить значение из переменной «task1» или Как я могу получить значение, которое возвращается из метода Task1?

обновлено:

    def Task1(**kwargs):
          file_name = kwargs['dag_run'].conf.get[file]
          task_instance = kwargs['task_instance']
          task_instance.xcom_push(key='file', value=file_name) 
          return file_name

  t1 = PythonOperator(task_id = 'Task1',provide_context=True,python_callable=Task1,dag=dag)

  t2 =   BashOperator(
      task_id='Moving_bucket', 
      bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1',key='file') }} ',
      dag=dag,
    )

t2.set_upstream(t1)

1 Ответ

0 голосов
/ 03 мая 2018

Возможно, вы захотите проверить XCOM Airflow: https://airflow.apache.org/concepts.html#xcoms

Если вы возвращаете значение из функции, это значение сохраняется в xcom. В вашем случае вы можете получить к нему доступ из другого кода Python:

task_instance = kwargs['task_instance']
task_instance.xcom_pull(task_ids='Task1')

или в шаблоне примерно так:

{{ task_instance.xcom_pull(task_ids='Task1') }}

Если вы хотите указать ключ, который вы можете нажать в XCOM (находясь внутри задачи):

task_instance = kwargs['task_instance']
task_instance.xcom_push(key='the_key', value=my_str)

Затем вы можете получить к нему доступ так:

task_instance.xcom_pull(task_ids='my_task', key='the_key')

РЕДАКТИРОВАТЬ 1

Дополнительный вопрос: Вместо использования значения в другой функции, как я могу передать значение другому PythonOperator, например - "t2 =" BashOperator (task_id = 'Moving_bucket', bash_command = 'python / home / raw .py "% s" '% file_name, dag = dag) "--- я хочу получить доступ к file_name, которое возвращается" Task1 ". Как это может быть достигнуто?

Прежде всего, мне кажется, что на самом деле значение не передается другому PythonOperator, а BashOperator.

Во-вторых, это уже описано в моем ответе выше. Поле bash_command является шаблонным (см. template_fields в источнике: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/bash_operator.py). Следовательно, мы можем использовать шаблонную версию:

BashOperator(
  task_id='Moving_bucket', 
  bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ',
  dag=dag,
)

РЕДАКТИРОВАТЬ 2

Пояснение: Airflow работает следующим образом: он выполнит Task1, затем заполнит xcom, а затем выполнит следующую задачу. Итак, чтобы ваш пример работал, сначала нужно выполнить Task1, а затем выполнить Moving_bucket после Task1.

Поскольку вы используете функцию возврата, вы также можете опустить key='file' из xcom_pull и не устанавливать ее вручную в функции.

...