Давайте сначала посмотрим на конец программы.
Модуль многопроцессорной обработки использует atexit
для вызова multiprocessing.util._exit_function
, когда ваша программа заканчивается.
Если вы удалите g2.next()
, вашПрограмма быстро заканчивается.
_exit_function
в конечном итоге вызывает Pool._terminate_pool
.Основной поток изменяет состояние pool._task_handler._state
с RUN
на TERMINATE
.Тем временем поток pool._task_handler
зацикливается на Pool._handle_tasks
и выдает его, когда достигает условия
if thread._state:
debug('task handler found thread._state != RUN')
break
(См. /Usr/lib/python2.6/multiprocessing/pool.py)
Это то, что останавливает обработчик задач от полного использования вашего генератора, g()
.Если вы посмотрите на Pool._handle_tasks
, вы увидите
for i, task in enumerate(taskseq):
...
try:
put(task)
except IOError:
debug('could not put task on queue')
break
Это код, который потребляет ваш генератор.(taskseq
не совсем ваш генератор, но поскольку потребляется taskseq
, ваш генератор тоже.)
Напротив, когда вы вызываете g2.next()
, основной поток вызывает IMapIterator.next
и ждет, когдаоно достигает self._cond.wait(timeout)
.
То, что основной поток ожидает вместо вызова _exit_function
, - это то, что позволяет потоку обработчика задач работать нормально, что означает полное использование генератора, так как он put
s выполняет задачи в worker
s 'inqueue
в функции Pool._handle_tasks
.
Суть в том, что все функции карты Pool
потребляют всю итерацию, которая ей дана.Если вы хотите использовать генератор порциями, вы можете сделать это вместо этого:
import multiprocessing as mp
import itertools
import time
def g():
for el in xrange(50):
print el
yield el
def f(x):
time.sleep(1)
return x * x
if __name__ == '__main__':
pool = mp.Pool(processes=4) # start 4 worker processes
go = g()
result = []
N = 11
while True:
g2 = pool.map(f, itertools.islice(go, N))
if g2:
result.extend(g2)
time.sleep(1)
else:
break
print(result)