Комбинация однотипных и многопроцессорных очередей мешает упорядочению очереди - PullRequest
2 голосов
/ 16 марта 2011

Я использую следующую схему для многопроцессорной обработки:

    for item in data:
        inQ.put(item)

    for i in xrange(nProcesses):
        inQ.put('STOP')
        multiprocessing.Process(target=worker, args=(inQ, outQ)).start()

    inQ.join()
    outQ.put('STOP')

    for result in iter(outQ.get, 'STOP'):
        # save result

Который работает нормально. Но если я отправлю пустой массив через outQ, 'STOP' не закончится в конце outQ, что приведет к преждевременному завершению цикла извлечения результатов.

Вот некоторый код для воспроизведения поведения.

import multiprocessing
import numpy as np

def worker(inQ, outQ):
    for i in iter(inQ.get, 'STOP'):
        result = np.random.rand(1,100)
        outQ.put(result)
        inQ.task_done()
    inQ.task_done() # for the 'STOP'

def main():
    nProcesses = 8
    data = range(1000)

    inQ = multiprocessing.JoinableQueue()
    outQ = multiprocessing.Queue()
    for item in data:
        inQ.put(item)

    for i in xrange(nProcesses):
        inQ.put('STOP')
        multiprocessing.Process(target=worker, args=(inQ, outQ)).start()

    inQ.join()
    print outQ.qsize()
    outQ.put('STOP')

    cnt = 0
    for result in iter(outQ.get, 'STOP'):
        cnt += 1
    print "got %d items" % cnt
    print outQ.qsize()

if __name__ == '__main__':
    main()

Если вы замените result = np.random.rand(1,100) на что-то вроде result = i*i, код будет работать как положено.

Что здесь происходит? Я делаю что-то в корне не так? Я ожидал, что outQ.put() после inQ.join() сделает то, что я хочу, начиная с блоков join(), пока все процессы не выполнят все put() s.

На обходном пути у меня работает цикл извлечения результатов с while outQ.qsize() > 0, который работает find. Но я прочитал qsize() не надежно. Это ненадежно только во время работы разных процессов? Будет ли для меня надежным полагаться на qsize() после выполнения inQ.join()?

Я ожидаю, что некоторые люди предложат использовать multiprocessing.Pool.map(), но я получаю ошибки рассола, когда делаю это с массивами (ndarrays).

Спасибо, что посмотрели!

Ответы [ 2 ]

1 голос
/ 17 марта 2011

Поскольку вы знаете, сколько элементов ожидать от outQ, другой обходной путь будет заключаться в явном ожидании этого количества элементов:

import multiprocessing as mp
import numpy as np
import Queue

N=100

def worker(inQ, outQ):
    while True:
        i,item=inQ.get()
        result = np.random.rand(1,N)
        outQ.put((i,result))
        inQ.task_done()

def main():
    nProcesses = 8
    data = range(N)
    inQ = mp.JoinableQueue()
    outQ = mp.Queue()    

    for i,item in enumerate(data):
        inQ.put((i,item))

    for i in xrange(nProcesses):
        proc=mp.Process(target=worker, args=[inQ, outQ])
        proc.daemon=True
        proc.start()

    inQ.join()
    cnt=0
    for _ in range(N):
        result=outQ.get()
        print(result)
        cnt+=1
        print(cnt)      
    print('got {c} items'.format(c=cnt))

if __name__ == '__main__':
    main()
1 голос
/ 16 марта 2011

массивы numpy используют богатые сравнения.Таким образом, == 'STOP' возвращает массив NumPy, а не Bool, и этот массив NUMPY не может быть приведен к BOOL.Под прикрытием iter (outQ.get, 'STOP') выполняет только это сравнение и, возможно, рассматривает исключение, когда пытается преобразовать результат в логическое значение как False.Вам нужно будет выполнить ручной цикл while, извлечь элементы из очереди, проверить, есть ли isinstance (item, basestring), прежде чем сравнивать его с «STOP».

while True:
    item = outQ.get()
    if isinstance(item, basestring) and item == 'STOP':
        break
    cnt += 1

Проверка qsize () также может сработатьхорошо, потому что никакой другой процесс не добавляет в очередь после присоединения входной очереди.

...