Есть ли способ ограничить, сколько будет представлено в пул работников? - PullRequest
0 голосов
/ 30 октября 2018

У меня есть Пул рабочих, и я использую apply_async, чтобы передать им работу. Мне плевать на результат функции, примененной к каждому элементу. Кажется, пул принимает любое количество вызовов apply_async, независимо от того, насколько велики данные или насколько быстро работники могут справиться с работой.

Есть ли способ сделать блок apply_async, как только определенное количество элементов ожидает обработки? Я уверен, что внутренне, пул использует Очередь, так что было бы тривиально просто использовать максимальный размер для Очереди?

Если это не поддерживается, имеет ли смысл представлять большой отчет, потому что это выглядит как базовая функциональность и довольно тривиально добавить?

Было бы стыдно, если бы по сути пришлось заново реализовать всю логику пула, просто чтобы заставить эту работу.

Вот очень простой код:

from multiprocessing import Pool
dowork(item):
    # process the item (for side effects, no return value needed)
    pass 

pool = Pool(nprocesses)
for work in getmorework():
    # this should block if we already have too many work waiting!        
    pool.apply_async(dowork, (work,))
pool.close()
pool.join()

Ответы [ 2 ]

0 голосов
/ 03 ноября 2018

альтернативой может быть использование Queue напрямую:

from multiprocessing import Process, JoinableQueue
from time import sleep
from random import random

def do_work(i):
    print(f"worker {i}")
    sleep(random())
    print(f"done {i}")

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()

def generator(n):
    for i in range(n):
        print(f"gen {i}")
        yield i

# 1 = allow generator to get this far ahead
q = JoinableQueue(1)

# 2 = maximum amount of parallelism
procs = [Process(target=worker) for _ in range(2)]
# and get them running
for p in procs:
    p.daemon = True
    p.start()

# schedule 10 items for processing
for item in generator(10):
    q.put(item)

# wait for jobs to finish executing
q.join()

# signal workers to finish up
for p in procs:
    q.put(None)
# wait for workers to actually finish
for p in procs:
    p.join()

в основном украдено из примера модуля Python queue:

https://docs.python.org/3/library/queue.html#queue.Queue.join

0 голосов
/ 31 октября 2018

Так что-то вроде этого?

import multiprocessing
import time

worker_count = 4
mp = multiprocessing.Pool(processes=worker_count)
workers = [None] * worker_count

while True:
    try:
        for i in range(worker_count):
            if workers[i] is None or workers[i].ready():
                workers[i] = mp.apply_async(dowork, args=next(getmorework()))
    except StopIteration:
        break
    time.sleep(1)

Я не знаю, как быстро вы ожидаете, что каждый работник закончит, time.sleep может или не может быть необходимым или может потребоваться другое время или что-то еще.

...