Совместное использование очереди результатов между несколькими процессами - PullRequest
69 голосов
/ 28 марта 2012

В документации для модуля multiprocessing показано, как передать очередь процессу, запущенному с multiprocessing.Process. Но как я могу разделить очередь с асинхронными рабочими процессами, запущенными с apply_async? Мне не нужно динамическое объединение или что-то еще, просто способ для рабочих (неоднократно) сообщать свои результаты обратно на базу.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    q = multiprocessing.Queue()
    workers = pool.apply_async(worker, (33, q))

Это не с: RuntimeError: Queue objects should only be shared between processes through inheritance. Я понимаю, что это значит, и я понимаю совет наследовать, а не требовать травления / расщепления (и все специальные ограничения Windows). Но как сделать , чтобы передать очередь таким образом, чтобы это работало? Я не могу найти пример, и я попробовал несколько альтернатив, которые потерпели неудачу различными способами. Помогите пожалуйста?

Ответы [ 2 ]

103 голосов
/ 29 марта 2012

Попробуйте использовать multiprocessing.Manager , чтобы управлять своей очередью, а также сделать ее доступной для разных работников.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()
    workers = pool.apply_async(worker, (33, q))
1 голос
/ 08 апреля 2019

multiprocessing.Pool уже имеет общую очередь результатов, нет необходимости дополнительно привлекать Manager.Queue.Manager.Queue - это queue.Queue (многопоточная очередь) под капотом, расположенная на отдельном серверном процессе и доступная через прокси.Это добавляет дополнительные издержки по сравнению с внутренней очередью пула.В отличие от собственной обработки результатов пула, результаты в Manager.Queue также не гарантированно упорядочиваются.

Рабочие процессы не начинаются с .apply_async(), это ужепроисходит, когда вы создаете экземпляр Pool.То, что началось, когда вы звоните pool.apply_async(), является новой «работой».Рабочие процессы пула запускают multiprocessing.pool.worker -функцию под капотом.Эта функция заботится о обработке новых «задач», переданных через внутренний пул Pool._inqueue, и об отправке результатов родителю через Pool._outqueue.Указанный вами func будет выполнен в течение multiprocessing.pool.worker.func должен только return что-то, и результат будет автоматически отправлен обратно родителю.

.apply_async() немедленно (асинхронно) возвращает AsyncResult объект (псевдоним для ApplyResult).Вам нужно вызвать .get() (блокирует) для этого объекта, чтобы получить фактический результат.Другой вариант - зарегистрировать функцию callback , которая запускается, как только результат становится готовым.

from multiprocessing import Pool

def busy_foo(i):
    """Dummy function simulating cpu-bound work."""
    for _ in range(int(10e6)):  # do stuff
        pass
    return i

if __name__ == '__main__':

    with Pool(4) as pool:
        print(pool._outqueue)  # DEMO
        results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
        # `.apply_async()` immediately returns AsyncResult (ApplyResult) object
        print(results[0])  # DEMO
        results = [res.get() for res in results]
        print(f'result: {results}')       

Пример вывода:

<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Примечание:Указание timeout -параметра для .get() не остановит фактическую обработку задачи внутри работника, оно только разблокирует ожидающего родителя, подняв multiprocessing.TimeoutError.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...