Процесс завершается, но не может быть присоединен? - PullRequest
0 голосов
/ 05 июля 2019

Чтобы ускорить выполнение определенной задачи, я создаю подкласс Process, чтобы создать работника, который будет обрабатывать данных, поступающих в выборках.Некоторые управляющие классы будут передавать данные и читать выходные данные (используя два экземпляра Queue).Для асинхронной работы я использую put_nowait и get_nowait.В конце я отправляю специальный код завершения моему процессу, после чего он прерывает свой внутренний цикл.Однако ... этого никогда не происходит.Вот минимальный воспроизводимый пример:

import multiprocessing as mp

class Worker(mp.Process):
  def __init__(self, in_queue, out_queue):
    super(Worker, self).__init__()
    self.input_queue = in_queue
    self.output_queue = out_queue

  def run(self):
    while True:
      received = self.input_queue.get(block=True)
      if received is None:
        break
      self.output_queue.put_nowait(received)
    print("\tWORKER DEAD")


class Processor():
  def __init__(self):
    # prepare
    in_queue = mp.Queue()
    out_queue = mp.Queue()
    worker = Worker(in_queue, out_queue)
    # get to work
    worker.start()
    in_queue.put_nowait(list(range(10**5))) # XXX
    # clean up
    print("NOTIFYING")
    in_queue.put_nowait(None)
    #out_queue.get() # XXX
    print("JOINING")
    worker.join()

Processor()

Этот код никогда не завершается, постоянно висит так:

NOTIFYING
JOINING
    WORKER DEAD

Почему?

I 'мы пометили две строки XXX.В первом случае, если я отправлю меньше данных (скажем, 10**4), все закончится нормально (процессы присоединяются, как и ожидалось).Точно так же во втором, если я get() после уведомления рабочих, чтобы закончить.Я знаю, что что-то упустил, но ничего в документации не кажется уместным.

1 Ответ

1 голос
/ 05 июля 2019

В документации упоминается, что

Когда объект помещается в очередь, объект протравливается, а фоновый поток позже сбрасывает протравленные данные в нижележащий канал.Это имеет некоторые последствия [...] После помещения объекта в пустую очередь может возникнуть бесконечно малая задержка, прежде чем метод empty () в очереди вернет False и get_nowait () сможет вернуться без вызова queue.Empty.

https://docs.python.org/3.7/library/multiprocessing.html#pipes-and-queues

и, кроме того,

всякий раз, когда вы используете очередь, вам необходимо убедиться, что все элементы, помещенные в очередь, в конечном итоге будут удалены до того, какпроцесс присоединен.В противном случае вы не можете быть уверены, что процессы, которые поместили элементы в очередь, завершатся.

https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing-programming

Это означает, что описываемое вами поведение, вероятно, вызвано состоянием скачка между self.output_queue.put_nowait(received) в работнике и присоединением работника с worker.join() в Processers__init__.Если присоединение было быстрее, чем подача его в очередь, все заканчивается нормально.Если это было слишком медленно, в очереди есть элемент, и рабочий не присоединился бы.

Раскомментирование out_queue.get() в основном процессе приведет к опустошению очереди, что позволяет присоединиться.Но так как важно, чтобы очередь возвращалась, если очередь уже была бы пустой, использование тайм-аута может быть вариантом, чтобы попытаться переждать состояние гонки, например, out_qeue.get(timeout=10).

Возможно, важноа также для защиты основной процедуры, особенно для Windows ( многопроцессорная обработка Python в Windows, если __name__ == "__main __" )

...