Многопроцессорная обработка Python. Queue + Process: правильное завершение обеих программ - PullRequest
2 голосов
/ 19 октября 2011

Учитывая эту программу Python:

# commented out code are alternatives I tried that don't work.

from multiprocessing import Process, Queue
#from multiprocessing import Process, JoinableQueue as Queue

def start_process(queue):
    # queue.cancel_join_thread()
    while True:
        print queue.get()

if __name__ == '__main__':
    queue = Queue()
    # queue.cancel_join_thread()

    process = Process(target=start_process, args=(queue,))
    process.start()

    queue.put(12)
    process.join()

Когда я убиваю эту программу с помощью CTRL-C, это происходит:

$> python queuetest.py
12
^CTraceback (most recent call last):
  File "queuetest.py", line 19, in <module>
    process.join()
  File ".../python2.7/multiprocessing/process.py", line 119, in join
    res = self._popen.wait(timeout)
Process Process-1:
  File ".../python2.7/multiprocessing/forking.py", line 122, in wait
Traceback (most recent call last):
    return self.poll(0)
  File ".../python2.7/multiprocessing/forking.py", line 107, in poll
    pid, sts = os.waitpid(self.pid, flag)
  File ".../python2.7/multiprocessing/process.py", line 232, in _bootstrap
KeyboardInterrupt
    self.run()
  File ".../python2.7/multiprocessing/process.py", line 88, in run
    self._target(*self._args, **self._kwargs)
  File "queuetest.py", line 9, in start_process
    print queue.get()
  File ".../python2.7/multiprocessing/queues.py", line 91, in get
    res = self._recv()
KeyboardInterrupt

.. как правильно завершить две обработки по сигналу?

Чего я хочу добиться: в моей неминимальной программе второй процесс содержит SocketServer и нуждается в дополнительном интерактивном интерфейсе командной строки.

1 Ответ

3 голосов
/ 20 октября 2011

Решение состоит в том, чтобы отправить определенное сообщение (например, строку 'exit' в моем образце) через очередь, чтобы завершить нормальный рабочий (дочерний процесс).Поскольку сигнал CTRL-C отправляется всем дочерним элементам, мы должны его игнорировать.Вот пример кода:

from multiprocessing import Process, Queue

def start_process(queue):
  while True:
    try:
        m = queue.get()
        if m == 'exit':
            print 'cleaning up worker...'
            # add here your cleaning up code
            break
        else:
            print m
    except KeyboardInterrupt:
        print 'ignore CTRL-C from worker'


if __name__ == '__main__':
  queue = Queue()

  process = Process(target=start_process, args=(queue,))
  process.start()

  queue.put(12)

  try:
    process.join()
  except KeyboardInterrupt:
    print 'wait for worker to cleanup...'
    queue.put('exit')
    process.join()

    ## or to kill anyway the worker if is not terminated after 5 seconds ...
    ## process.join(5)
    ## if process.is_alive():
    ##     process.terminate()
...