Я пытаюсь сгенерировать набор динамических задач из переменной XCOM. В XCOM я храню список, и я хочу использовать каждый элемент списка для динамического создания последующей задачи.
Мой пример использования: у меня есть оператор восходящего потока, который проверяет файлы на sftp-сервере и возвращает список имен файлов, соответствующих определенным критериям. Я хочу создать динамические последующие задачи для каждого возвращаемого имени файла.
Я упростил это до следующего, и пока он работает, я чувствую, что это не идиоматическое решение для воздушного потока. В моем случае я бы написал функцию python, которая вызывается из оператора python, который извлекает значение из xcom и возвращает его вместо использования функции pusher.
Я понимаю, что, хотя я могу создать пользовательский оператор, который объединяет оба, я не думаю, что создание одноразового оператора является хорошей практикой, и я надеюсь, что есть другое решение.
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
"owner": "test",
"depends_on_past": False,
"start_date": datetime(2018, 10, 27),
"email": ["test@mctest.com"],
"email_on_failure": False,
"email_on_retry": False,
"email_on_success": False,
"retries": 0,
"provide_context": True
}
dag = DAG("test", default_args=default_args, schedule_interval="@daily", catchup=False)
def pusher(**context):
return ['a', 'b', 'c', 'd', 'e']
pusher_task = PythonOperator(
task_id='pusher_task',
dag=dag,
python_callable=pusher
)
def bash_wrapper(task, **context):
return BashOperator(
task_id='dynamic'+task,
dag=dag,
bash_command='date'
)
end = BashOperator(task_id='end', dag=dag, bash_command='echo task has ended')
pusher_task >> [bash_wrapper(task) for task in pusher()] >> end