Airflow - создание динамических задач из XCOM - PullRequest
3 голосов
/ 14 апреля 2019

Я пытаюсь сгенерировать набор динамических задач из переменной XCOM. В XCOM я храню список, и я хочу использовать каждый элемент списка для динамического создания последующей задачи.

Мой пример использования: у меня есть оператор восходящего потока, который проверяет файлы на sftp-сервере и возвращает список имен файлов, соответствующих определенным критериям. Я хочу создать динамические последующие задачи для каждого возвращаемого имени файла.

Я упростил это до следующего, и пока он работает, я чувствую, что это не идиоматическое решение для воздушного потока. В моем случае я бы написал функцию python, которая вызывается из оператора python, который извлекает значение из xcom и возвращает его вместо использования функции pusher.

Я понимаю, что, хотя я могу создать пользовательский оператор, который объединяет оба, я не думаю, что создание одноразового оператора является хорошей практикой, и я надеюсь, что есть другое решение.

from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    "owner": "test",
    "depends_on_past": False,
    "start_date": datetime(2018, 10, 27),
    "email": ["test@mctest.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "email_on_success": False,
    "retries": 0,
    "provide_context": True
}

dag = DAG("test",  default_args=default_args, schedule_interval="@daily", catchup=False)


def pusher(**context):
    return ['a', 'b', 'c', 'd', 'e']

pusher_task = PythonOperator(
    task_id='pusher_task',
    dag=dag,
    python_callable=pusher  
)

def bash_wrapper(task, **context):
    return BashOperator(
        task_id='dynamic'+task,
        dag=dag,
        bash_command='date'
    )

end = BashOperator(task_id='end', dag=dag, bash_command='echo task has ended')


pusher_task >> [bash_wrapper(task) for task in pusher()] >> end

Ответы [ 2 ]

2 голосов
/ 12 июня 2019

Я бы не стал делать то, чего ты пытаешься достичь, потому что:

  1. Значение XCOM - это состояние, сгенерированное в время выполнения
  2. Структура DAG определяется чем-то за время разбора

Даже если вы используете что-то вроде следующего, чтобы получить доступ к значениям XCOM, сгенерированным какой-то вышестоящей задачей:

from airflow.models import TaskInstance
from airflow.utils.db import provide_session

dag = DAG(...)

@provide_session
def get_files_list(session):
    execution_date = dag.previous_schedule(datetime.now())

    // Find previous task instance:
    ti = session.query(TaskInstance).filter(
        TaskInstance.dag_id == dag.dag_id,
        TaskInstance.execution_date == execution_date,
        TaskInstance.task_id == upstream_task_id).first()
    if ti:
        files_list = ti.xcom_pull()
        if files_list:
            return files_list
    // Return default state:
    return {...}


files_list = get_files_list()
// Generate tasks based on upstream task state:
task = PythonOperator(
    ...
    xcom_push=True,
    dag=dag)

Но это будет вести себя очень странно, потому что анализ DAG и выполнение задач не синхронизируются так, как вы хотите.

Если основная причина, по которой вы хотите это сделать - это распараллеливание обработки файлов, у меня будет некоторое статическое число задач обработки (определяемое требуемым параллелизмом), которые читают список файлов из значения XCOM вышестоящей задачи и работают с соответствующей частью этот список.

Другим вариантом является распараллеливание обработки файлов с использованием некоторой инфраструктуры для распределенных вычислений, такой как Apache Spark.

0 голосов
/ 12 июня 2019

Самый простой способ, который я могу придумать, - это использовать оператор ветвления.https://github.com/apache/airflow/blob/master/airflow/example_dags/example_branch_operator.py

...