Как действительно создать n задач в SubDAG на основе результата предыдущей задачи - PullRequest
0 голосов
/ 15 мая 2018

Я создаю динамический DAG в Airflow, используя SubDAG. Что мне нужно, так это то, что количество задач внутри SubDAG определяется результатом предыдущей задачи (переменная subtask_ids функции middle_section должна быть той же переменной, что и функция initial_task).

Дело в том, что я не могу получить доступ к xcom внутри функции subdag SubDagOperator, потому что у меня нет контекста. Кроме того, я не могу обратиться к какой-либо БД для чтения какого-либо значения из-за функции DAG автоматического обнаружения планировщика: middle_section выполняется каждые несколько секунд.

Как вы, ребята, решаете это? Создать динамическое количество задач внутри SubDAG в зависимости от результата предыдущей задачи?

Вот код, который я разрабатываю:

import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}


def initial_task(**context):
    subtask_ids = [0, 1, 2]
    task_instance = context['ti']
    task_instance.xcom_push(key='depot_ids', value=subtask_ids)


def middle_section_task(subtask_id):
    print(subtask_id)


def middle_section(parent_dag, arg):
    subdag = DAG(dag_id=f'{dag.dag_id}.middle',
                 default_args=args, schedule_interval='@once')

    subtask_ids = ''  # Read from xcom

    for subtask_id in subtask_ids:
        PythonOperator(task_id=f'{dag.dag_id}.middle_section_task_{subtask_id}',
                       python_callable=middle_section_task,
                       op_kwargs={'subtask_id': subtask_id}, dag=subdag)

    return subdag


def end_task(**context):
    print('Finished')


dag = DAG(dag_id='stackoverflow', default_args=args, schedule_interval=None)

initial = PythonOperator(task_id='start_task', python_callable=initial_task,
                         provide_context=True, dag=dag)

middle = SubDagOperator(task_id='middle', subdag=middle_section(dag, args),
                        default_args=args, dag=dag)

end = PythonOperator(task_id='end_task', python_callable=end_task,
                     provide_context=True, dag=dag)

initial >> middle >> end

1 Ответ

0 голосов
/ 16 мая 2018

У меня была та же проблема, я не мог на 100% решить проблему «воздушным потоком», так как я думаю, что число задач и подзадач воздушного потока определяется в момент проверки DAG.И при проверке никакая задача не запускается, поэтому воздушный поток не знает заранее, сколько запланированных задач subdag.task.

Способ, которым я обошел эту проблему, может быть не лучшим (я открыт дляпредложения), но это работает:

main_dag.py

# imports omitted for brevity
def get_info_from_db():
    # get info from db or somewhere else, this info will define the number of subdag tasks to run
    return urls, names

dag = DAG(...)

urls, names = get_info_from_db()

# You may ignore the dummy operators
start = DummyOperator(task_id='start', default_args=args, dag=dag)
sub_section = SubDagOperator(
    task_id='import-file',
    subdag=imported_subdag(DAG_NAME, 'subdag-name', args, urls=urls, file_names=names),
    default_args=args,
    dag=dag,
)
end = DummyOperator(task_id='end', default_args=args, dag=dag)

start.set_downstream(sub_section)
section_1.set_downstream(end)

Тогда, наконец, у меня есть мой subdag.py (убедитесь, что его можно обнаружить из потока воздуха) на случай, если он находится в отдельном файле

# imports omitted for brevity
def fetch_files(file_url, file_name):
    # get file and save it to disk
    return file_location

# this is how I get info returned from the previous task: fetch_files
def validate_file(task_id, **kwargs):
    ti = kwargs['ti']
    task = 'fetch_file-{}'.format(task_id)
    file_location = ti.xcom_pull(task_ids=task)

def imported_subdag(parent_dag_name, child_dag_name, args, urls, file_names):
    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=args,
        schedule_interval="@daily",
    )
    for i in range(len(urls)):
        # the task name should also be dynamic in order not to have duplicates
        validate_file_operator = PythonOperator(task_id='validate_file-{}'.format(i+1),
                                                python_callable=validate_file,
                                                provide_context=True, dag=dag_subdag, op_kwargs={'task_id': i + 1})
        fetch_operator = PythonOperator(task_id='fetch_file-{}'.format(i+1),
                                        python_callable=fetch_zip, dag=dag_subdag,
                                        op_kwargs={'file_url': urls[i], 'file_name': file_names[i]})
        fetch_operator.set_downstream(validate_file_operator)
    return dag_subdag

По сути, моя логика заключается в том, что в момент проверки Airflow get_info_from_db() выполняется, и все dag и subdags правильно планируются динамически.Если я добавляю или удаляю содержимое из базы данных, число задач, которые нужно выполнить, будет обновлено при следующей проверке dag.

Этот подход подходит для моего варианта использования, но я надеюсь, что в будущем Airflow поддержит эту функцию (динамическое количество задач / подзадач.задач) изначально.

...