Python3 Как корректно завершить работу многопроцессорного приложения - PullRequest
0 голосов
/ 08 июля 2019

Я пытаюсь исправить приложение python3, в котором создаются несколько процессов и потоков, управляемых различными очередями и каналами. Я пытаюсь сделать форму контролируемого выхода, когда кто-то пытается сломать программу с помощью Ctrl-C. Однако не важно, что я делаю, это всегда висит только в конце.

Я пытался использовать исключение прерывания клавиатуры и перехват сигнала Приведенный ниже код является частью многопроцессорного кода.

from multiprocessing import Process, Pipe, JoinableQueue as Queue, Event

class TaskExecutor(Process):
  def __init__(....)
    {inits}

  def signal_handler(self, sig, frame):
    print('TaskExecutor closing')
    self._in_p.close()
    sys.exit(1)

  def run
    signal.signal(signal.SIGINT, self.signal_handler)
    signal.signal(signal.SIGTERM, self.signal_handler)
    while True:
      # Get the Task Groupe name from the Task queue.
      try:
        ExecCmd = self._in_p.recv() # type: TaskExecCmd
      except Exceptions as e:
        self._in_p.close()
        return 
      if ExecCmd.Kill:
        self._log.info('{:30} : Kill Command received'.format(self.name))
        self._in_p.close()
        return
      else 
    {other code executing here}

Я получаю отпечаток выше, который закрывается. но я все еще получаю много разных исключений, которые я пытаюсь поймать, но это не так.

Я ищу некоторую документацию о том, как и в каком порядке завершать многопроцессорную работу и ее основной процесс.

Я знаю, что это очень общий вопрос, но это очень большое приложение, поэтому, если есть какие-либо вопросы или вещи, которые я мог бы проверить, я мог бы сузить их.

Привет

1 Ответ

0 голосов
/ 12 июля 2019

Таким образом, после дальнейшего изучения этой проблемы я обнаружил, что в ситуации, когда у меня был поток канала, поток очереди и 4 многопроцессорных запуска.Количество этих процессов может закончиться зависанием при завершении приложения с помощью ctrl-c.Процесс Pipe and Queue уже завершен.

В многопроцессорной документации есть предупреждение.

Предупреждение Если этот метод используется, когда связанный процесс использует канал или очередь, то этот канал или очередь могут быть повреждены и могут стать непригодными для использования другим процессом.Точно так же, если процесс получил блокировку или семафор и т. Д., То его завершение может вызвать взаимную блокировку других процессов.

И я думаю, что это именно то, что происходит.Я также обнаружил, что, несмотря на то, что в моем многопроцессорном классе есть механизм выключения, потоки, которые все еще выполняются, могут быть сочтены живыми (чтение is_alive () ), хотя я знаю, что run () метод возврата IE внутри сома висел.

Теперь о решении.Мои мультипроцессы были для дизайна, а не для Deamon, потому что я хотел контролировать их уничтожение.Однако я изменил их на Деймон, чтобы они всегда были убиты.Сначала я добавил, что любой сигнал уничтожения будет вызывать исключение ProgramKilled во всей моей программе.

def signal_handler(signum, frame):
  raise ProgramKilled('Task Executor killed')

Затем я изменил свой механизм выключения в своем многопроцессорном классе на

while True:
  # Get the Task Groupe name from the Task queue.
  try:
    # Reading from pipe
    ExecCmd = self._in_p.recv() # type: TaskExecCmd
  # If fatal error just close it all
  except BrokenPipe:
    break
  # This can occure close the pipe and break the loop
  except EOFError:
    self._in_p.close()
    break
  # Exception for when a kill signal is detected
  # Set the multiprocess as killed (just waiting for the kill command from main)
  except ProgramKilled:
    self._log.info('{:30} : Died'.format(self.name))
    self._KilledStatus = True
    continue
  # kill command from main recieved 
  # Shut down all we can. Ignore exceptions 
  if ExecCmd.Kill:
    self._log.info('{:30} : Kill Command received'.format(self.name))
    try:
      self._in_p.close()
      self._out_p.join()
    except Exception:
      pass
    self._log.info('{:30} : Kill Command executed'.format(self.name))
    break
  else if (not self._KilledStatus):
    {Execute code}

# When out of the loop set killed event
KilledEvent.set()

И в моем главном потоке я добавил следующий процесс очистки.

#loop though all my resources
for ThreadInterfaces in ResourceThreadDict.values():
  # test each process in each resource
  for ThreadIf in ThreadInterfaces:
    # Wait for its event to be set
    ThreadIf['KillEvent'].wait()
    # When event have been recevied see if its hanging 
    # We know at this point every thing have been closed and all data have been purged correctly so if its still alive terminate it. 
    if ThreadIf['Thread'].is_alive(): 
      try:
        psutil.Process(ThreadIf['Thread'].pid).terminate()
      except (psutil.NoSuchProcess, AttributeError):
        pass

После многих испытаний я знаю, что очень трудно контролировать завершение и приложение с несколькими процессами, потому что вы просто не знаете, в каком порядке все ваши процессы получают этот сигнал.

Я пытался каким-то образом сохранить большую часть своих данных, когда они были уничтожены.Некоторые утверждают, что мне нужны эти данные при ручном завершении работы приложения.Но в этом случае это приложение запускает множество внешних сценариев и других приложений, и любое из них может заблокировать приложение, а затем вам нужно вручную убить его, но при этом сохранить информацию о том, что уже было выполнено.

Так что это мое решение моей нынешней проблемы с моими текущими знаниями.Любой вклад или более глубокие знания о том, что происходит, приветствуется.Обратите внимание, что это приложение работает как на Linux, так и на Windows.

С уважением

...