Как использовать Queue для многопроцессорной обработки с Python? - PullRequest
0 голосов
/ 02 марта 2020

Эта программа работает нормально, она должна вывести: 0 1 2 3.

from multiprocessing import Process, Queue

NTHREADS = 4

def foo(queue, id):
    queue.put(id)

if __name__ == '__main__':
    queue = Queue()
    procs = []
    for id in range(NTHREADS):
        procs.append(Process(target=foo, args=(queue, id)))

    for proc in procs:
        proc.start()

    for proc in procs:
        proc.join()

    while not queue.empty():
        print(queue.get())

Но не с этим. Я думаю, что он останавливается после join ().

from multiprocessing import Process, Queue
from PIL import Image

NTHREADS = 4

def foo(queue):
    img = Image.new('RGB', (200,200), color=(255,0,0))
    queue.put(img)

if __name__ == '__main__':
    queue = Queue()
    procs = []
    for i in range(NTHREADS):
        procs.append(Process(target=foo, args=(queue,)))

    for proc in procs:
        proc.start()

    for proc in procs:
        proc.join()

    while not queue.empty():
        print(queue.get().size)

Почему? Как я могу дойти до конца? Как я могу получить свое изображение? Я хотел бы поработать над четырьмя изображениями параллельно, а затем объединить их в одно окончательное изображение.

1 Ответ

1 голос
/ 02 марта 2020

Очереди сложные звери под одеялом. Когда объект (pickle of) помещается в очередь, его части подаются в механизм межпроцессного взаимодействия ОС, а остальное остается в буфере Python в памяти, чтобы избежать перегрузки средств ОС. Вещество в буфере памяти подается в механизм ОС, поскольку принимающая сторона освобождает место для большего, убирая содержимое из очереди.

Следствием этого является то, что рабочий процесс не может завершиться до его завершения. буферы памяти (входящие в очереди) пусты.

В вашей первой программе целые числа настолько малы, что буферы памяти не вступают в игру. Рабочий за один глоток подает весь рассол в ОС, и работник может затем выйти.

Но во второй программе соленья намного больше. Рабочий отправляет часть рассола в ОС, а затем ждет, пока основная программа отключит механизм ОС, чтобы он мог передать следующую часть рассола. Поскольку ваша программа никогда ничего не удаляет из очереди перед вызовом .join(), рабочие ждут вечно.

В общем, это правило: никогда не пытайтесь .join(), пока все очереди не будут опустошены.

Обратите внимание на это из документов :

Предупреждение. Как упоминалось выше, если дочерний процесс поместил элементы в очередь (и он не использовал JoinableQueue. cancel_join_thread), то этот процесс не завершится, пока все буферизованные элементы не будут сброшены в канал. Это означает, что если вы попытаетесь присоединиться к этому процессу, вы можете получить тупик, если не уверены, что все элементы, помещенные в очередь, были использованы.

Кроме того, queue.empty() - плохой способ чтобы проверить это. Это может только сказать вам, находятся ли данные в очереди в тот момент, когда они происходят. При параллельной обработке это в лучшем случае вероятностное приближение c к истине. Во втором примере вы точно знаете, сколько элементов вы ожидаете получить из очереди, поэтому этот способ будет надежным:

for proc in procs:
    proc.start()

for i in range(NTHREADS):
    print(queue.get().size)

for proc in procs: # join AFTER queue is drained
    proc.join()
...