Я использую следующую схему для многопроцессорной обработки:
for item in data:
inQ.put(item)
for i in xrange(nProcesses):
inQ.put('STOP')
multiprocessing.Process(target=worker, args=(inQ, outQ)).start()
inQ.join()
outQ.put('STOP')
for result in iter(outQ.get, 'STOP'):
# save result
Который работает нормально. Но если я отправлю пустой массив через outQ
, 'STOP'
не закончится в конце outQ
, что приведет к преждевременному завершению цикла извлечения результатов.
Вот некоторый код для воспроизведения поведения.
import multiprocessing
import numpy as np
def worker(inQ, outQ):
for i in iter(inQ.get, 'STOP'):
result = np.random.rand(1,100)
outQ.put(result)
inQ.task_done()
inQ.task_done() # for the 'STOP'
def main():
nProcesses = 8
data = range(1000)
inQ = multiprocessing.JoinableQueue()
outQ = multiprocessing.Queue()
for item in data:
inQ.put(item)
for i in xrange(nProcesses):
inQ.put('STOP')
multiprocessing.Process(target=worker, args=(inQ, outQ)).start()
inQ.join()
print outQ.qsize()
outQ.put('STOP')
cnt = 0
for result in iter(outQ.get, 'STOP'):
cnt += 1
print "got %d items" % cnt
print outQ.qsize()
if __name__ == '__main__':
main()
Если вы замените result = np.random.rand(1,100)
на что-то вроде result = i*i
, код будет работать как положено.
Что здесь происходит? Я делаю что-то в корне не так? Я ожидал, что outQ.put()
после inQ.join()
сделает то, что я хочу, начиная с блоков join()
, пока все процессы не выполнят все put()
s.
На обходном пути у меня работает цикл извлечения результатов с while outQ.qsize() > 0
, который работает find. Но я прочитал qsize()
не надежно. Это ненадежно только во время работы разных процессов? Будет ли для меня надежным полагаться на qsize()
после выполнения inQ.join()
?
Я ожидаю, что некоторые люди предложат использовать multiprocessing.Pool.map()
, но я получаю ошибки рассола, когда делаю это с массивами (ndarrays).
Спасибо, что посмотрели!