Заставить многопроцессорный пул перебирать аргументы - PullRequest
0 голосов
/ 20 октября 2018

Я использую многопроцессорный пул для многократного запуска функции для нескольких аргументов.Я использую список для заданий, заполненных другим потоком, и функцию job_handler для обработки каждого задания.Моя проблема в том, что когда список станет пустым, пул завершит работу функции.Я хочу, чтобы бассейн оставался в живых и ждал, пока список не заполнится.На самом деле, есть два сценария для решения этой проблемы.

1.Используйте один пул, но он закончится после того, как список станет пустым:

from multiprocessing import Pool
from threading import Thread
from time import sleep


def job_handler(i):
    print("Doing job:", i)
    sleep(0.5)

def job_adder():
    i = 0
    while True:
        jobs.append(i)
        i += 1
        sleep(0.1)


if __name__ == "__main__":
    pool = Pool(4)
    jobs = []
    thr = Thread(target=job_adder)
    thr.start()
    # wait for job_adder to add to list
    sleep(1)
    pool.map_async(job_handler, jobs)
    while True:
        pass

2.Multiple map_async:

from multiprocessing import Pool
from threading import Thread
from time import sleep


def job_handler(i):
    print("Doing job:", i)
    sleep(0.5)

def job_adder():
    i = 0
    while True:
        jobs.append(i)
        i += 1
        sleep(0.1)


if __name__ == "__main__":
    pool = Pool(4)
    jobs = []
    thr = Thread(target=job_adder)
    thr.start()
    while True:
        for job in jobs:
            pool1 = pool.map_async(job_handler, (job,))
            jobs.remove(job)

В чем разница между двумя?Я думаю, что первый вариант будет лучше, потому что сама карта будет обрабатывать итерацию.Моя цель - повысить производительность при выполнении каждой работы в отдельности.

1 Ответ

0 голосов
/ 21 октября 2018

Необходимость «замедлить» Pool возникает в ряде ситуаций.Этот случай проще, чем некоторые :

q=queue.Queue()
m=pool.imap(iter(q.get,None))

Вы также можете использовать imap_unordered;None является стражем для прекращения действия Pool.Pool должен использовать поток для сбора задач (поскольку эти функции «ленивее [чем] map()»), и он будет блокироваться на q при необходимости.

...