Python: Как я могу проверить количество ожидающих задач в многопроцессорной обработке. - PullRequest
9 голосов
/ 04 апреля 2011

У меня небольшой пул работников (4) и очень большой список заданий (5000 ~).Я использую пул и отправляю задачи с map_async ().Поскольку задача, которую я выполняю, довольно длинная, я задаю размер фрагмента 1, чтобы один длинный процесс не мог задержать несколько более коротких.

Я бы хотел периодически проверять, какмногие задачи оставлены для представления.Я знаю, что максимум 4 будут активны, я обеспокоен тем, сколько осталось обработать.

Я погуглил и не могу найти никого, кто делает это.

Некоторые простыекод в помощь:

import multiprocessing
import time

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)

pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()

while True:
    if not jobs.ready():
        print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
        jobs.wait(2)
    else:
        break

Ответы [ 4 ]

7 голосов
/ 05 апреля 2011

Похоже, job._number_left это то, что вы хотите. _ указывает, что это внутреннее значение, которое может измениться по прихоти разработчиков, но, похоже, это единственный способ получить эту информацию.

1 голос
/ 20 августа 2015

У меня схожие требования: отслеживать прогресс, выполнять промежуточную работу на основе результатов, полностью останавливать всю обработку в любое произвольное время Как я справлялся с этим - отправлять задачи по одному с apply_async. Сильно упрощенная версия того, что я делаю:

maxProcesses = 4
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
runlist = range(100000)
sendcounter = 0
donecounter = 0
while donecounter < len(runlist):
    if stopNowBooleanFunc():  # if for whatever reason I want to stop processing early
        if donecounter == sendcounter:  # wait til already sent tasks finish running
            break
    else:  # don't send new tasks if it's time to stop
        while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses:
            pool.apply_async(mytask, (runlist[sendcounter], q))
            sendcounter += 1

    while not q.empty():  # process completed results as they arrive
        aresult = q.get()
        processResults(aresult)
        donecounter += 1

Обратите внимание, что я использую Queue вместо return результатов.

1 голос
/ 04 апреля 2011

Нет воздухонепроницаемого способа, о котором я знаю, но если вы используете функцию Pool.imap_unordered() вместо map_async, вы можете перехватывать обрабатываемые элементы.

import multiprocessing
import time

process_count = 4

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)
    # Actually, you should return the job you've created here.
    return num

pool = multiprocess.Pool(process_count)
jobs  = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
    jobs.append(job)
    job_count += 1

    incomplete = len(items) - job_count
    unsubmitted = max(0, incomplete - process_count)

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted

pool.close()

Я вычитаю process_count,потому что вы можете в значительной степени предполагать, что все процессы будут обрабатываться с одним из двух исключений: 1) если вы используете итератор, возможно, не останется больше элементов для использования и обработки, и 2) у вас может быть меньше 4 элементов.Я не кодировал для первого исключения.Но это должно быть довольно легко сделать, если вам нужно.В любом случае, в вашем примере используется список, поэтому у вас не должно быть этой проблемы.

Редактировать: Я также понял, что вы используете цикл While, который создает впечатление, будто вы пытаетесь что-то периодически обновлять, скажем,, каждые полсекунды или что-то.Код, который я привел в качестве примера, не сделает это таким образом.Я не уверен, что это проблема.

0 голосов
/ 24 августа 2018

Вы можете проверить количество ожидающих заданий, увидев атрибут Pool._cache, предполагая, что вы используете apply_async.Здесь хранится ApplyResult до тех пор, пока они не станут доступны, и равно числу ожидающих ApplyResult s.

import multiprocessing as mp
import random
import time


def job():
    time.sleep(random.randint(1,10))
    print("job finished")

if __name__ == '__main__':
    pool = mp.Pool(5)
    for _ in range(10):
        pool.apply_async(job)

    while pool._cache:
        print("number of jobs pending: ", len(pool._cache))
        time.sleep(2)

    pool.close()
    pool.join()
...