Создание архитектуры Airflow DAG, требующей контекстного регулирования - PullRequest
6 голосов
/ 26 мая 2020
  • У меня есть группа единиц задания (рабочих), которую я хочу запустить как DAG
  • Группа 1 имеет 10 рабочих, и каждый рабочий выполняет несколько извлечений таблицы из БД. Обратите внимание, что каждый рабочий процесс сопоставляется с одним экземпляром БД, и каждый рабочий должен успешно работать со 100 таблицами в общей сложности, прежде чем он сможет успешно пометить себя как завершенный.
  • Группа 1 имеет ограничение, которое гласит, что во всех этих Одновременно следует употреблять 10 рабочих. Например:
    • Worker1 извлекает 2 таблицы
    • Worker2 извлекает 2 таблицы
    • Worker3 извлекает 1 таблицу
    • Worker4 ... Worker10 нужно подождать пока Worker1 ... Worker3 не откажется от потоков
    • Worker4 ... Worker10 может забирать таблицы, как только потоки на шаге 1 освобождаются
    • Когда каждый рабочий завершает все 100 таблиц, он переходит к step2 без ожидания. Step2 не имеет ограничений на параллелизм

Я должен иметь возможность создать один узел Group1, который обслуживает дросселирование, а также иметь

  • 10 независимых узлов рабочих, чтобы я мог перезапустить их в случае, если кто-то из них выйдет из строя

Я попытался объяснить это на следующей диаграмме: enter image description here

  • Если какой-либо из рабочих выйдет из строя, я могу перезапустить его, не затрагивая других рабочих. Он по-прежнему использует тот же пул потоков из Group1, поэтому ограничения параллелизма применяются
  • Group1 будет завершена после завершения всех элементов step1 и step2
  • Step2 не имеет никаких мер параллелизма

Как реализовать такую ​​иерархию в Airflow для приложения Spring Boot Java? Можно ли разработать такой тип DAG, используя конструкции Airflow, и динамически сообщать Java приложению, сколько таблиц оно может извлекать за раз. Например, если все рабочие процессы, кроме Worker1, завершены, Worker1 теперь может использовать все 5 доступных потоков, а все остальное перейдет к шагу 2.

1 Ответ

1 голос
/ 05 июня 2020

Эти ограничения не могут быть смоделированы в виде ориентированного ациклического c графа и, следовательно, не могут быть реализованы в воздушном потоке точно так, как описано. Однако они могут быть смоделированы как очереди и, таким образом, могут быть реализованы с помощью структуры очереди заданий. Вот ваши два варианта:

Неоптимально реализовать как DAG воздушного потока:

from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
# Executors that inherit from BaseExecutor take a parallelism parameter
from wherever import SomeExecutor, SomeOperator

# Table load jobs are done with parallelism 5
load_tables = SubDagOperator(subdag=DAG("load_tables"), executor=SomeExecutor(parallelism=5))

# Each table load must be it's own job, or must be split into sets of tables of predetermined size, such that num_tables_per_job * parallelism = 5
for table in tables:
    load_table = SomeOperator(task_id=f"load_table_{table}", dag=load_tables)

# Jobs done afterwards are done with higher parallelism
afterwards = SubDagOperator(
    subdag=DAG("afterwards"), executor=SomeExecutor(parallelism=high_parallelism)
)

for job in jobs:
    afterward_job = SomeOperator(task_id=f"job_{job}", dag=afterwards)

# After _all_ table load jobs are complete, start the jobs that should be done afterwards

load_tables > afterwards

Неоптимальный аспект здесь заключается в том, что для первой половины DAG кластер будет недоиспользован higher_parallelism - 5.

Оптимальная реализация с очередью заданий:

# This is pseudocode, but could be easily adapted to a framework like Celery

# You need two queues
# The table load queue should be initialized with the job items
table_load_queue = Queue(initialize_with_tables)
# The queue for jobs to do afterwards starts empty
afterwards_queue = Queue()

def worker():

    # Work while there's at least one item in either queue
    while not table_load_queue.empty() or not afterwards_queue.empty():
        working_on_table_load = [worker.is_working_table_load for worker in scheduler.active()]

        # Work table loads if we haven't reached capacity, otherwise work the jobs afterwards
        if sum(working_on_table_load) < 5:
            is_working_table_load = True
            task = table_load_queue.dequeue()
        else
            is_working_table_load = False
            task = afterwards_queue.dequeue()

        if task:
            after = work(task)
            if is_working_table_load:

                # After working a table load, create the job to work afterwards
                afterwards_queue.enqueue(after)

# Use all the parallelism available
scheduler.start(worker, num_workers=high_parallelism)

При таком подходе кластер не будет использоваться недостаточно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...