multiprocessing.Pool: Как запускать новые процессы, когда заканчиваются старые? - PullRequest
0 голосов
/ 03 октября 2018

Я использую многопроцессорный пул для управления процессами тессеракта (распознавание страниц микрофильма).Очень часто в пуле, скажем, 20 процессов тессеракта несколько страниц будут более сложными для распознавания, и, таким образом, эти процессы занимают намного больше времени, чем другие.Между тем, пул просто зависает, и большинство процессоров не используются.Я хочу, чтобы эти отставшие были оставлены для продолжения, но я также хочу запустить больше процессов, чтобы заполнить многие другие процессоры, которые сейчас бездействуют, пока заканчиваются эти несколько липких страниц.Мой вопрос: есть ли способ загрузить новые процессы, чтобы использовать эти простаивающие процессоры.Другими словами, можно ли заполнить пустые места в пуле перед ожиданием завершения всего пула?

Я мог бы использовать асинхронную версию starmap, а затем загрузить новый пул, когда текущий пул вышел из строя.определенное количество живых процессов.Но это кажется не элегантным.Было бы более элегантно автоматически сохранять слоты в процессах по мере необходимости.

Вот как мой код выглядит сейчас:

def getMpBatchMap(fileList, commandTemplate, concurrentProcesses):
    mpBatchMap = []
    for i in range(concurrentProcesses):
        fileName = fileList.readline()
        if fileName:
            mpBatchMap.append((fileName, commandTemplate))
    return mpBatchMap

def executeSystemProcesses(objFileName, commandTemplate):
    objFileName = objFileName.strip()
    logging.debug(objFileName)
    objDirName = os.path.dirname(objFileName)
    command = commandTemplate.substitute(objFileName=objFileName, objDirName=objDirName)
    logging.debug(command)
    subprocess.call(command, shell=True)

def process(FILE_LIST_FILENAME, commandTemplateString, concurrentProcesses=3):
    """Go through the list of files and run the provided command against them,
    one at a time. Template string maps the terms $objFileName and $objDirName.

    Example:
    >>> runBatchProcess('convert -scale 256 "$objFileName" "$objDirName/TN.jpg"')
    """
    commandTemplate = Template(commandTemplateString)
    with open(FILE_LIST_FILENAME) as fileList:
        while 1:
            # Get a batch of x files to process
            mpBatchMap = getMpBatchMap(fileList, commandTemplate, concurrentProcesses)
            # Process them
            logging.debug('Starting MP batch of %i' % len(mpBatchMap))
            if mpBatchMap:
                with Pool(concurrentProcesses) as p:
                    poolResult = p.starmap(executeSystemProcesses, mpBatchMap)
                    logging.debug('Pool result: %s' % str(poolResult))
            else:
                break

1 Ответ

0 голосов
/ 03 октября 2018

Вы что-то смешиваете здесь.Пул всегда поддерживает количество указанных процессов.До тех пор, пока вы не закроете пул, либо вручную, либо оставив блок with в контекстном менеджере, вам не нужно будет пополнять пул процессами, потому что они никуда не денутся.

То, что вы, вероятно, хотели сказать, это «задачи», задачи, над которыми могут работать эти процессы.Задача - это блок процесса для итерируемого, который вы передаете методам пула.И да, есть способ использовать незанятые процессы в пуле для новых задач до Все ранее поставленные в очередь задачи были обработаны.Вы уже выбрали правильный инструмент для этого - асинхронные версии методов пула.Все, что вам нужно сделать, это повторно применить какой-то метод асинхронного пула.

from multiprocessing import Pool
import os

def busy_foo(x):
    x = int(x)
    for _ in range(x):
        x - 1
    print(os.getpid(), ' returning: ', x)
    return x

if __name__ == '__main__':

    arguments1 = zip([222e6, 22e6] * 2)
    arguments2 = zip([111e6, 11e6] * 2)

    with Pool(4) as pool:

        results = pool.starmap_async(busy_foo, arguments1)
        results2 = pool.starmap_async(busy_foo, arguments2)

        print(results.get())
        print(results2.get())

Пример вывода:

3182  returning:  22000000
3185  returning:  22000000
3185  returning:  11000000
3182  returning:  111000000
3182  returning:  11000000
3185  returning:  111000000
3181  returning:  222000000
3184  returning:  222000000
[222000000, 22000000, 222000000, 22000000]
[111000000, 11000000, 111000000, 11000000]

Process finished with exit code 0

Примечание выше, процессы 3182 и 3185, которые заканчиваются сболее простая задача, немедленно начните с задач из второго списка аргументов, не дожидаясь завершения 3181 и 3184.

Если вы по какой-то причине действительно хотите использовать свежие процессы после некоторого количества обработанных задачдля каждого процесса есть параметр maxtasksperchild для Pool.Там вы можете указать, через сколько задач пул должен заменить старые процессы новыми.Значением по умолчанию для этого аргумента является None, поэтому пул не заменяет процессы по умолчанию.

...