Какой самый чистый способ написать и запустить DAG задач? - PullRequest
2 голосов
/ 27 октября 2019

Я хочу написать и запустить ориентированный ациклический граф (DAG) с несколькими задачами, работающими последовательно или параллельно. В идеале это должно выглядеть так:

def task1():
    # ...

def task2():
    # ...

graph = Sequence([
    task1,
    task2,
    Parallel([
        task3,
        task4
    ]),
    task5
]

graph.run()

Это будет работать 1 -> 2 -> (3 и 4 одновременно) -> 5. Задачи должны иметь доступ к глобальной области видимости для хранения результатов, записи журналов и доступапараметры командной строки.

Мой вариант использования - написание сценария развертывания. Параллельные задачи связаны с IO: обычно ожидают на удаленном сервере выполнения шага.

Я изучил многопоточность, asyncio, Airflow, но не нашел ни одной простой библиотеки, которая позволила бы этобез какого-либо шаблонного кода, чтобы пройти и контролировать выполнение графа. Существует ли что-нибудь подобное?

1 Ответ

2 голосов
/ 27 октября 2019

Вот быстрое подтверждение концепции. Его можно использовать следующим образом:

graph = sequence(
            lambda: print(1),
            lambda: print(2),
            parallel(
                lambda: print(3),
                lambda: print(4),
                sequence(
                    lambda: print(5),
                    lambda: print(6))),
             lambda: print(7)

graph()

1
2
3
5
6
4
7

sequence создает функцию, которая оборачивает цикл for, а parallel создает функцию, которая оборачивает использование пула потоков:

from typing import Callable
from multiprocessing.pool import ThreadPool

Task = Callable[[], None]

_pool: ThreadPool = ThreadPool()

def sequence(*tasks: Task) -> Task:
    def run():
        for task in tasks:
            task()

    return run  # Returning "run" to be used as a task by other "sequence" and "parallel" calls

def parallel(*tasks: Task) -> Task:
    def run():
        _pool.map(lambda f: f(), tasks)  # Delegate to a pool used for IO tasks

    return run

Каждый вызов sequence и parallel возвращает новую «задачу» (функция, не имеющая аргументов и ничего не возвращающая). Затем эта задача может быть вызвана другими внешними вызовами sequence и parallel.

Что нужно отметить о ThreadPool:

  • . используйте пул потоков для parallel, из-за GIL он все равно будет выполнять только одну вещь за раз. Это означает, что parallel практически бесполезен для задач, связанных с процессором.

  • Я не указал, сколько потоков должно начинаться с пула. Я думаю, что по умолчанию используется количество ядер, доступных вам. Вы можете указать, сколько вы хотите начать с первого параметра, используя ThreadPool, если хотите больше.

  • Для краткости я не очищаю ThreadPool. Вы должны определенно сделать это, хотя, если вы используете это.

  • Даже если ThreadPool является частью multiprocessing, это сбивает с толку использование потоков, а не процессов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...