Предотвращение взаимоблокировок из-за переполнения очереди с помощью многопроцессорной обработки.JoinableQueue - PullRequest
0 голосов
/ 27 апреля 2019

Предположим, у нас есть multiprocessing.Pool, где рабочие потоки совместно используют multiprocessing.JoinableQueue, удаляя рабочие элементы и потенциально помещая в очередь больше работы:

def worker_main(queue):
    while True:
        work = queue.get()
        for new_work in process(work):
            queue.put(new_work)
        queue.task_done()

Когда очередь заполнится, queue.put() заблокируется. Пока есть хотя бы один процесс, читающий из очереди с queue.get(), он освободит место в очереди, чтобы разблокировать писателей. Но все процессы могут потенциально блокироваться на queue.put() одновременно.

Есть ли способ избежать такого заклинивания?

1 Ответ

1 голос
/ 27 апреля 2019

В зависимости от того, как часто process(work) создает больше элементов, может не быть решения, кроме очереди бесконечного максимального размера.

Короче говоря, ваша очередь должна быть достаточно большой, чтобы вместить весь объем работы.элементы, которые вы можете иметь в любое время.


Поскольку очередь реализована с семафорами , на самом деле может быть жесткий предел размера SEM_VALUE_MAX, который в MacOS - 32767 .Поэтому вам нужно создать подкласс этой реализации или использовать put(block=False) и обрабатывать queue.Full (например, поместить лишние элементы в другое место), если этого недостаточно.

В качестве альтернативы, посмотрите на один из 3-х.сторонние реализации распределенной очереди рабочих элементов для Python .

...