Возможность создания групп DAG в Airflow - PullRequest
0 голосов
/ 25 февраля 2020

Есть ли способ, которым можно динамически создать файл DAG из кода и загрузить его по потоку (AirFlow читает из каталога dags, но создание файла для каждого DAG и загрузка его в эту папку выполняется медленно)

Можно ли создать шаблонную метку и заполнить ее новыми логинами c всякий раз, когда это необходимо?

Я видел, что они работают над API. В текущей версии есть только триггер DAG.

Ответы [ 2 ]

1 голос
/ 25 февраля 2020

Да, вы можете создавать динамические c группы обеспечения доступности баз данных следующим образом:

from datetime import datetime

from airflow import DAG

from airflow.operators.python_operator import PythonOperator


def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py,
            dag_number=dag_number)

    return dag


# build a dag for each number in range(10)
for n in range(1, 10):
    dag_id = 'hello_world_{}'.format(str(n))

    default_args = {'owner': 'airflow',
                    'start_date': datetime(2018, 1, 1)
                    }

    schedule = '@daily'

    dag_number = n

    globals()[dag_id] = create_dag(dag_id,
                                  schedule,
                                  dag_number,
                                  default_args)

Пример из https://www.astronomer.io/guides/dynamically-generating-dags/

Однако обратите внимание, что это может вызвать некоторые проблемы как задержки между выполнением задач. Это связано с тем, что Airflow Scheduler и Worker придется анализировать весь файл при планировании / выполнении каждой задачи для одной группы обеспечения доступности баз данных.

Так как в одном файле будет много DAG (скажем, 100), это будет означать, что все 100 объектов DAG должны быть проанализированы при выполнении одной задачи для DAG1.

Я бы порекомендовал создать инструмент, который создает один файл для DAG.

1 голос
/ 25 февраля 2020

Вы можете довольно легко создать несколько тегов в одном файле:

create_dag(dag_id):
  dag = DAG(....)
  // some tasks added
  return dag

for dag_id in dags_lists:
  globals()[dag_id] = create_dag(dag_id)

Если вы создадите правильный объект DAG с помощью функции шаблона (create_dag в приведенном выше примере) и сделаете их доступными в объект globalbals, Airflow распознает их как отдельные группы DAG.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...