Python многопроцессорная очередь приводит к зависанию кода с большими данными - PullRequest
0 голосов
/ 28 января 2020

Я использую многопроцессорную обработку python для анализа некоторых больших текстов. После нескольких дней, пытаясь выяснить, почему мой код зависал (т.е. процессы не заканчивались), я смог воссоздать проблему с помощью следующего простого кода:

import multiprocessing as mp

for y in range(65500, 65600):
    print(y)

    def func(output):

         output.put("a"*y)

    if __name__ == "__main__":

        output = mp.Queue()

        process = mp.Process(target = func, args = (output,))

        process.start()

        process.join()

Как видите, если элемент для помещения в очередь становится слишком большим, процесс просто зависает. Он не останавливается, если я напишу больше кода после output.put(), он запустится, но все равно процесс никогда не остановится.

Это начинается, когда строка достигает 65500 символов, в зависимости от вашего интерпретатора, это может меняются.

Я знал, что у mp.Queue есть аргумент maxsize, но, обнаружив, я обнаружил, что он касается размера очереди по количеству элементов, а не размера самих элементов.

Есть ли способ обойти это? Данные, которые мне нужно поместить в очередь в исходном коде, очень, очень велики ...

1 Ответ

1 голос
/ 28 января 2020

Ваша очередь заполняется без пользователя, чтобы очистить ее.

Из определения Queue.put:

Если блок необязательного аргумента равен True (по умолчанию) и время ожидания равно None (по умолчанию), блокируйте при необходимости до тех пор, пока не освободится свободный слот.

Предполагается, что между производителем и потребителем невозможна тупиковая ситуация (и если исходный код действительно имеет потребитель, так как ваш образец не), в конечном итоге производители должны быть разблокированы и прекратить. Проверьте код вашего потребителя (или добавьте его к вопросу, чтобы мы могли посмотреть)


Обновление

Это не проблема, потому что очередь не имеет был задан максимальный размер, поэтому задание должно выполняться до тех пор, пока не закончится память.

Это не поведение очереди. Как указано в этом билете , здесь блокируется не сама очередь, а нижележащий канал. Из связанного ресурса (вставки между "[]" мои):

Очередь работает так: - когда вы вызываете queue.put (data), данные добавляются в deque, что может расти и уменьшаться вечно - тогда поток извлекает элементы из очереди и отправляет их так, чтобы другой процесс мог получить их через канал или сокет Unix (созданный через socketpair). Но, что немаловажно, оба канала и сокеты unix имеют ограниченную емкость (раньше она составляла 4 КБ - размер страницы - в старых ядрах Linux для каналов, теперь это 64 КБ, и между 64 КБ-120 К для сокетов unix. в зависимости от настраиваемых систем). - когда вы выполняете queue.get (), вы просто выполняете чтение по каналу / сокету

[..], когда размер [становится слишком большим], поток записи блокирует системный вызов записи. А так как соединение выполняется до удаления элемента [note: это ваш process.join], вы просто зашли в тупик, поскольку соединение ожидает завершения потока отправки, и запись не может быть завершена, так как канал / сокет заполнен! Если вы удалите элемент из очереди перед ожиданием процесса отправки, все будет нормально.


Обновление 2

Я понимаю. Но на самом деле у меня нет потребителя (если это именно то, о чем я думаю), я получу результаты из очереди только после того, как процесс завершит его помещение в очередь.

Да, это проблема. multiprocessing.Queue не является контейнером для хранения. Вы должны использовать его исключительно для передачи данных между «производителями» (процессами, которые генерируют данные, поступающие в очередь) и «потребителями (процессами, которые« используют »эти данные). Как вы теперь знаете, оставлять данные там - плохая идея .

Как я могу получить элемент из очереди, если я даже не могу поставить его туда первым?

put и get скрыть проблему помещения Соберите данные, если они заполняют канал, поэтому вам нужно всего лишь настроить al oop в вашем "основном" процессе на get элементов из очереди и, например, добавить их в список. в пространстве памяти основного процесса и не забивает трубу.

...