Airflow DAG с конфигурацией / параметром json и l oop для этого параметра для генерации операторов - PullRequest
0 голосов
/ 14 июля 2020

У меня даг срабатывает вручную. Он принимает такие параметры, как:

{"id_list":"3,5,1"}

В DAG я создаю операторы динамически на основе этого списка целых чисел:

for id in id_list:
   task = create_task(id)

Мне нужно инициализировать id_list на основе значений параметров id_list. Как я могу инициализировать этот список, если я не могу напрямую ссылаться на этот параметр, когда я не в шаблонном поле? Вот как я хочу видеть это в представлении графика, где задачи процесса основаны на параметрах id_list.

введите описание изображения здесь

Я видел примеры динамически создаваемых задач, но на самом деле они не являются динамическими c в том смысле, что значения списка жестко запрограммированы. Задачи создаются динамически на основе списка значений жесткого кода, если это имеет смысл.

Ответы [ 2 ]

1 голос
/ 21 июля 2020

A DAG и его задачи должны быть решены до того, как станет доступным для использования; это включает веб-сервер, планировщик, везде. Веб-сервер на самом деле является прекрасным примером, почему: как бы вы визуализировали процесс для пользователя?

Единственными динамическими c компонентами процесса являются параметры, доступные во время визуализации шаблона. В большинстве случаев я видел, как люди использовали от PythonOperator до l oop над вводом и выполняли какое-то действие N раз для решения той же проблемы.

0 голосов
/ 21 июля 2020

Сначала создайте фиксированное количество задач для выполнения. В этом примере используется PythonOperator. В python_callable, если index меньше, чем длина param_list, выполните else raise AirflowSkipException

        def execute(index, account_ids):
            param_list = account_ids.split(',')
            if index < len(param_list):
                print(f"execute task index {index}")
            else:
                raise AirflowSkipException


        def create_task(task_id, index):
            return PythonOperator(task_id=task_id,
                                  python_callable=execute,
                                  op_kwargs={
                                      "index": index,
                                      "account_ids": "{{ dag_run.conf['account_ids'] }}"}
                                  )

        record_size_limit = 5
        ACCOUNT_LIST = [None] * record_size_limit

        for idx in range(record_size_limit):
            task = create_task(f"task_{idx}", idx)
            task

Trigger DAG и передайте это как параметры:

enter image description here

Graph View:

введите описание изображения здесь

...