Хорошо, так как в настоящее время нет ответа, я не чувствую себя слишком плохо, делая это.Хотя я до сих пор интересуюсь тем, что на самом деле происходит за кулисами, чтобы вызвать эту проблему, мои самые неотложные вопросы - те, которые указаны в обновлении 2. Это:
Каковы различия между JoinableQueue
иManager().Queue()
(и когда вы должны использовать один поверх другого?).И что важно, безопасно ли заменить один на другой, в этом примере?
В следующем коде у меня есть простой пул процессов.Каждому процессу передается очередь процесса (pq
) для извлечения данных для обработки и очередь возвращаемых значений (rq
) для передачи возвращенных значений обработки обратно в основной поток.Если я не добавляю в очередь возвращаемых значений это работает, но как только я это сделаю, по какой-то причине процессы блокируются от остановки.В обоих случаях процессы run
возвращают методы, так что это не put
при блокировке очереди возврата, но во втором случае сами процессы не завершаются, поэтому программа блокируется, когда я join
на процессах.Почему это так?
Обновления:
Похоже, что-то связано с количеством элементов в очереди. По крайней мере, на моей машине у меня может быть до 6570 элементов в очереди, и это на самом деле работает, но даже больше, чем это, и оно блокируется.
Кажется, что работает сManager().Queue()
. Будь то ограничение JoinableQueue
или просто я неправильно понимаю различия между этими двумя объектами, я обнаружил, что если я заменю очередь возврата на Manager().Queue()
, она будет работать, как и ожидалось.Каковы различия между ними, и когда вы должны использовать один над другим?
Ошибка не возникает, если я потребляю с rq
Oop.На мгновение здесь был ответ, и когда я его комментировал, он исчез.Во всяком случае, одна из вещей, которые он сказал, это вопрос о том, если я добавлю потребителя, эта ошибка все еще происходит.Я попробовал это, и ответ: нет, это не так.
Другая вещь, которую он упомянул, была цитата из многопроцессорная документация как возможный ключ к проблеме.Ссылаясь на JoinableQueue
, он говорит:
... семафор, используемый для подсчета количества незавершенных задач, может в конечном итоге переполниться, вызывая исключение.
import multiprocessing
class _ProcSTOP:
pass
class Proc(multiprocessing.Process):
def __init__(self, pq, rq):
self._pq = pq
self._rq = rq
super().__init__()
print('++', self.name)
def run(self):
dat = self._pq.get()
while not dat is _ProcSTOP:
# self._rq.put(dat) # uncomment me for deadlock
self._pq.task_done()
dat = self._pq.get()
self._pq.task_done()
print('==', self.name)
def __del__(self):
print('--', self.name)
if __name__ == '__main__':
pq = multiprocessing.JoinableQueue()
rq = multiprocessing.JoinableQueue()
pool = []
for i in range(4):
p = Proc(pq, rq)
p.start()
pool.append(p)
for i in range(10000):
pq.put(i)
pq.join()
for i in range(4):
pq.put(_ProcSTOP)
pq.join()
while len(pool) > 0:
print('??', pool)
pool.pop().join() # hangs here (if using rq)
print('** complete')
Пример вывода, без использования очереди возврата:
++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-4
== Proc-3
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-2
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>, <Proc(Proc-3, stopped)>]
-- Proc-3
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>]
-- Proc-2
?? [<Proc(Proc-1, stopped)>]
-- Proc-1
** complete
-- Proc-4
Пример вывода, с использованием очереди возврата:
++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-2
== Proc-4
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-3
# here it hangs