Python Multiprocessing.Pool ленивая итерация - PullRequest
51 голосов
/ 16 марта 2011

Мне интересно, как класс Python Multiprocessing.Pool работает с map, imap и map_async. Моя конкретная проблема заключается в том, что я хочу отобразить на итераторе, который создает объекты с большим объемом памяти, и не хочу, чтобы все эти объекты создавались в памяти одновременно. Я хотел посмотреть, не вызовут ли различные функции map () мой итератор, или он будет разумно вызывать функцию next () только при постепенном продвижении дочерних процессов, поэтому я взломал некоторые тесты как таковые:

def g():
  for el in xrange(100):
    print el
    yield el

def f(x):
  time.sleep(1)
  return x*x

if __name__ == '__main__':
  pool = Pool(processes=4)              # start 4 worker processes
  go = g()
  g2 = pool.imap(f, go)
  g2.next()

И так далее с map, imap и map_async. Однако это самый вопиющий пример: простой вызов next () один раз на g2 выводит все мои элементы из моего генератора g (), тогда как если бы imap делал это «лениво», я бы ожидал, что он вызовет только go.next () один раз и, следовательно, выведите только «1».

Может ли кто-нибудь выяснить, что происходит, и если есть какой-то способ, чтобы пул процессов "лениво" оценивал итератор по мере необходимости?

Спасибо

Гейб

Ответы [ 3 ]

32 голосов
/ 16 марта 2011

Давайте сначала посмотрим на конец программы.

Модуль многопроцессорной обработки использует atexit для вызова multiprocessing.util._exit_function, когда ваша программа заканчивается.

Если вы удалите g2.next(), вашПрограмма быстро заканчивается.

_exit_function в конечном итоге вызывает Pool._terminate_pool.Основной поток изменяет состояние pool._task_handler._state с RUN на TERMINATE.Тем временем поток pool._task_handler зацикливается на Pool._handle_tasks и выдает его, когда достигает условия

            if thread._state:
                debug('task handler found thread._state != RUN')
                break

(См. /Usr/lib/python2.6/multiprocessing/pool.py)

Это то, что останавливает обработчик задач от полного использования вашего генератора, g().Если вы посмотрите на Pool._handle_tasks, вы увидите

        for i, task in enumerate(taskseq):
            ...
            try:
                put(task)
            except IOError:
                debug('could not put task on queue')
                break

Это код, который потребляет ваш генератор.(taskseq не совсем ваш генератор, но поскольку потребляется taskseq, ваш генератор тоже.)

Напротив, когда вы вызываете g2.next(), основной поток вызывает IMapIterator.next и ждет, когдаоно достигает self._cond.wait(timeout).

То, что основной поток ожидает вместо вызова _exit_function, - это то, что позволяет потоку обработчика задач работать нормально, что означает полное использование генератора, так как он put s выполняет задачи в worker s 'inqueue в функции Pool._handle_tasks.

Суть в том, что все функции карты Pool потребляют всю итерацию, которая ей дана.Если вы хотите использовать генератор порциями, вы можете сделать это вместо этого:

import multiprocessing as mp
import itertools
import time


def g():
    for el in xrange(50):
        print el
        yield el


def f(x):
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    pool = mp.Pool(processes=4)              # start 4 worker processes
    go = g()
    result = []
    N = 11
    while True:
        g2 = pool.map(f, itertools.islice(go, N))
        if g2:
            result.extend(g2)
            time.sleep(1)
        else:
            break
    print(result)
4 голосов
/ 19 июня 2014

То, что вы хотите, реализовано в пакете NuMap с веб-сайта:

NuMap является параллельным (на основе потоков или процессов, локальным или удаленным), буферизированным, multi-task, itertools.imap или multiprocessing.Pool.imap замена функции.Как и imap, он оценивает функцию на элементах последовательности или итерируемой, и делает это лениво.Лень можно отрегулировать с помощью аргументов «шага» и «буфера».

4 голосов
/ 23 апреля 2013

У меня тоже была эта проблема, и я был разочарован, узнав, что карта потребляет все ее элементы. Я кодировал функцию, которая лениво использует итератор, используя тип данных Queue в многопроцессорной обработке. Это похоже на то, что @unutbu описывает в комментарии к своему ответу, но, как он указывает, страдает отсутствием механизма обратного вызова для повторной загрузки очереди. Вместо этого тип данных Queue предоставляет параметр timeout, и я использовал 100 миллисекунд для хорошего эффекта.

from multiprocessing import Process, Queue, cpu_count
from Queue import Full as QueueFull
from Queue import Empty as QueueEmpty

def worker(recvq, sendq):
    for func, args in iter(recvq.get, None):
        result = func(*args)
        sendq.put(result)

def pool_imap_unordered(function, iterable, procs=cpu_count()):
    # Create queues for sending/receiving items from iterable.

    sendq = Queue(procs)
    recvq = Queue()

    # Start worker processes.

    for rpt in xrange(procs):
        Process(target=worker, args=(sendq, recvq)).start()

    # Iterate iterable and communicate with worker processes.

    send_len = 0
    recv_len = 0
    itr = iter(iterable)

    try:
        value = itr.next()
        while True:
            try:
                sendq.put((function, value), True, 0.1)
                send_len += 1
                value = itr.next()
            except QueueFull:
                while True:
                    try:
                        result = recvq.get(False)
                        recv_len += 1
                        yield result
                    except QueueEmpty:
                        break
    except StopIteration:
        pass

    # Collect all remaining results.

    while recv_len < send_len:
        result = recvq.get()
        recv_len += 1
        yield result

    # Terminate worker processes.

    for rpt in xrange(procs):
        sendq.put(None)

Преимущество этого решения заключается в том, что он не группирует запросы к Pool.map. Один отдельный работник не может помешать другим добиться прогресса. YMMV. Обратите внимание, что вы можете использовать другой объект для оповещения об увольнении работников. В примере я использовал None.

Проверено на "Python 2.7 (r27: 82525, 4 июля 2010, 09:01:59) [MSC v.1500 32 бит (Intel)] на win32"

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...