Вот быстрое подтверждение концепции. Его можно использовать следующим образом:
graph = sequence(
lambda: print(1),
lambda: print(2),
parallel(
lambda: print(3),
lambda: print(4),
sequence(
lambda: print(5),
lambda: print(6))),
lambda: print(7)
graph()
1
2
3
5
6
4
7
sequence
создает функцию, которая оборачивает цикл for
, а parallel
создает функцию, которая оборачивает использование пула потоков:
from typing import Callable
from multiprocessing.pool import ThreadPool
Task = Callable[[], None]
_pool: ThreadPool = ThreadPool()
def sequence(*tasks: Task) -> Task:
def run():
for task in tasks:
task()
return run # Returning "run" to be used as a task by other "sequence" and "parallel" calls
def parallel(*tasks: Task) -> Task:
def run():
_pool.map(lambda f: f(), tasks) # Delegate to a pool used for IO tasks
return run
Каждый вызов sequence
и parallel
возвращает новую «задачу» (функция, не имеющая аргументов и ничего не возвращающая). Затем эта задача может быть вызвана другими внешними вызовами sequence
и parallel
.
Что нужно отметить о ThreadPool
:
. используйте пул потоков для parallel
, из-за GIL он все равно будет выполнять только одну вещь за раз. Это означает, что parallel
практически бесполезен для задач, связанных с процессором.
Я не указал, сколько потоков должно начинаться с пула. Я думаю, что по умолчанию используется количество ядер, доступных вам. Вы можете указать, сколько вы хотите начать с первого параметра, используя ThreadPool
, если хотите больше.
Для краткости я не очищаю ThreadPool
. Вы должны определенно сделать это, хотя, если вы используете это.
Даже если ThreadPool
является частью multiprocessing
, это сбивает с толку использование потоков, а не процессов.