Как предотвратить BrokenPipeErrors после получения SIGINT при использовании общих объектов процесса в Python? - PullRequest
0 голосов
/ 20 июня 2019

Эта программа на Python:

import concurrent.futures
import multiprocessing
import time

class A:

    def __init__(self):
        self.event = multiprocessing.Manager().Event()

    def start(self):
        try:
            while True:
                if self.event.is_set():
                    break
                print("processing")
                time.sleep(1)
        except BaseException as e:
            print(type(e).__name__ + " (from pool thread):", e)

    def shutdown(self):
        self.event.set()

if __name__ == "__main__":
    try:
        a = A()
        pool = concurrent.futures.ThreadPoolExecutor(1)
        future = pool.submit(a.start)
        while not future.done():
            concurrent.futures.wait([future], timeout=0.1)
    except BaseException as e:
        print(type(e).__name__ + " (from main thread):", e)
    finally:
        a.shutdown()
        pool.shutdown()

выводит:

processing
processing
processing
KeyboardInterrupt (from main thread):
BrokenPipeError (from pool thread): [WinError 232] The pipe is being closed
Traceback (most recent call last):
  File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 788, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".\foo.py", line 34, in <module>
    a.shutdown()
  File ".\foo.py", line 21, in shutdown
    self.event.set()
  File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 1067, in set
    return self._callmethod('set')
  File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 792, in _callmethod
    self._connect()
  File "C:\Program Files\Python37\lib\multiprocessing\managers.py", line 779, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "C:\Program Files\Python37\lib\multiprocessing\connection.py", line 490, in Client
    c = PipeClient(address)
  File "C:\Program Files\Python37\lib\multiprocessing\connection.py", line 691, in PipeClient
    _winapi.WaitNamedPipe(address, 1000)
FileNotFoundError: [WinError 2] The system cannot find the file specified

при запуске и через три секунды посылается сигнал SIGINT (при нажатии Ctrl + C ).

Анализ. - Сигнал SIGINT отправляется в главный поток каждого процесса.В этом случае есть два процесса: основной процесс и дочерний процесс менеджера.

  • В основном потоке основного процесса: после получения сигнала SIGINT обработчик сигнала по умолчанию SIGINTвызывает исключение KeyboardInterrupt, которое перехватывается и печатается.
  • В главном потоке дочернего процесса менеджера: в то же время после получения сигнала SIGINT повышается обработчик сигнала SIGINT по умолчаниюисключение KeyboardInterrupt, которое завершает дочерний процесс.Следовательно, все последующие использования общих объектов менеджера другими процессами вызывают исключение BrokenPipeError.
  • В дочернем потоке пула основного процесса: в этом случае исключение BrokenPipeError возникает в строке if self.event.is_set():.
  • В основном потоке основного процесса: Наконец, поток управления достигает линии a.shutdown(), которая вызывает исключения AttributeError и FileNotFoundError.

Как предотвратить это BrokenPipeError исключение?

1 Ответ

0 голосов
/ 20 июня 2019

Решением этой проблемы является переопределение обработчика сигнала SIGINT по умолчанию с помощью обработчика, который будет игнорировать сигнал, например, с помощью стандартного обработчика сигнала signal.SIG_IGN. Это возможно, вызвав функцию signal.signal в начале дочернего процесса менеджера:

import concurrent.futures
import multiprocessing.managers
import signal
import time

def init():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

class A:

    def __init__(self):
        manager = multiprocessing.managers.SyncManager()
        manager.start(init)
        self.event = manager.Event()

    def start(self):
        try:
            while True:
                if self.event.is_set():
                    break
                print("processing")
                time.sleep(1)
        except BaseException as e:
            print(type(e).__name__ + " (from pool thread):", e)

    def shutdown(self):
        self.event.set()

if __name__ == "__main__":
    try:
        a = A()
        pool = concurrent.futures.ThreadPoolExecutor(1)
        future = pool.submit(a.start)
        while not future.done():
            concurrent.futures.wait([future], timeout=0.1)
    except BaseException as e:
        print(type(e).__name__ + " (from main thread):", e)
    finally:
        a.shutdown()
        pool.shutdown()

Примечание. - Эта программа также работает с concurrent.futures.ProcessPoolExecutor.

...