Подпроцесс завершается, но все еще не завершается, вызывая взаимоблокировку - PullRequest
6 голосов
/ 06 ноября 2011

Хорошо, так как в настоящее время нет ответа, я не чувствую себя слишком плохо, делая это.Хотя я до сих пор интересуюсь тем, что на самом деле происходит за кулисами, чтобы вызвать эту проблему, мои самые неотложные вопросы - те, которые указаны в обновлении 2. Это:

Каковы различия между JoinableQueue иManager().Queue() (и когда вы должны использовать один поверх другого?).И что важно, безопасно ли заменить один на другой, в этом примере?


В следующем коде у меня есть простой пул процессов.Каждому процессу передается очередь процесса (pq) для извлечения данных для обработки и очередь возвращаемых значений (rq) для передачи возвращенных значений обработки обратно в основной поток.Если я не добавляю в очередь возвращаемых значений это работает, но как только я это сделаю, по какой-то причине процессы блокируются от остановки.В обоих случаях процессы run возвращают методы, так что это не put при блокировке очереди возврата, но во втором случае сами процессы не завершаются, поэтому программа блокируется, когда я join на процессах.Почему это так?

Обновления:

  1. Похоже, что-то связано с количеством элементов в очереди. По крайней мере, на моей машине у меня может быть до 6570 элементов в очереди, и это на самом деле работает, но даже больше, чем это, и оно блокируется.

  2. Кажется, что работает сManager().Queue(). Будь то ограничение JoinableQueue или просто я неправильно понимаю различия между этими двумя объектами, я обнаружил, что если я заменю очередь возврата на Manager().Queue(), она будет работать, как и ожидалось.Каковы различия между ними, и когда вы должны использовать один над другим?

  3. Ошибка не возникает, если я потребляю с rqOop.На мгновение здесь был ответ, и когда я его комментировал, он исчез.Во всяком случае, одна из вещей, которые он сказал, это вопрос о том, если я добавлю потребителя, эта ошибка все еще происходит.Я попробовал это, и ответ: нет, это не так.

    Другая вещь, которую он упомянул, была цитата из многопроцессорная документация как возможный ключ к проблеме.Ссылаясь на JoinableQueue, он говорит:

    ... семафор, используемый для подсчета количества незавершенных задач, может в конечном итоге переполниться, вызывая исключение.


import multiprocessing

class _ProcSTOP:
    pass

class Proc(multiprocessing.Process):

    def __init__(self, pq, rq):
        self._pq = pq
        self._rq = rq
        super().__init__()
        print('++', self.name)

    def run(self):
        dat = self._pq.get()

        while not dat is _ProcSTOP:
#            self._rq.put(dat)        # uncomment me for deadlock
            self._pq.task_done()
            dat = self._pq.get()

        self._pq.task_done() 
        print('==', self.name)

    def __del__(self):
        print('--', self.name)

if __name__ == '__main__':

    pq = multiprocessing.JoinableQueue()
    rq = multiprocessing.JoinableQueue()
    pool = []

    for i in range(4):
        p = Proc(pq, rq) 
        p.start()
        pool.append(p)

    for i in range(10000):
        pq.put(i)

    pq.join()

    for i in range(4):
       pq.put(_ProcSTOP)

    pq.join()

    while len(pool) > 0:
        print('??', pool)
        pool.pop().join()    # hangs here (if using rq)

    print('** complete')

Пример вывода, без использования очереди возврата:

++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-4
== Proc-3
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-2
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>, <Proc(Proc-3, stopped)>]
-- Proc-3
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>]
-- Proc-2
?? [<Proc(Proc-1, stopped)>]
-- Proc-1
** complete
-- Proc-4

Пример вывода, с использованием очереди возврата:

++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-2
== Proc-4
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-3
# here it hangs

1 Ответ

0 голосов
/ 06 ноября 2011

Из документации :

Внимание

Как упомянуто выше, если дочерний процесс поместил элементы в очередь (и он не использовал JoinableQueue.cancel_join_thread ()), то этот процесс не завершится, пока все буферизованные элементы не будут сброшены в канал.

Это означает, что если вы попытаетесь присоединиться к этому процессу, вы можете зайти в тупик, если вы не уверены, что все элементы, помещенные в очередь, были использованы. Аналогично, если дочерний процесс не является демоническим, то родительский процесс может зависать при выходе, когда он пытается присоединиться ко всем своим недемоническим дочерним процессам.

Обратите внимание, что в очереди, созданной с помощью диспетчера, этой проблемы нет. См. Руководство по программированию.

Таким образом, JoinableQueue () использует канал и будет ждать, пока он не сбросит все данные, прежде чем закрыться.

С другой стороны, объект Manager.Queue () использует совершенно другой подход. Менеджеры запускают отдельный процесс, который немедленно получает все данные (и сохраняет их в своей памяти).

Менеджеры предоставляют способ создания данных, которые могут быть разделены между различными процессами. Управляющий объект управляет процессом сервера, который управляет общими объектами. Другие процессы могут получать доступ к общим объектам с помощью прокси.

...

* * Очередь тысячи двадцать-три ([MAXSIZE]) Создайте общий объект Queue.Queue и верните для него прокси.
...