multiprocessing.Process target выполняется только 2 раза из 3 - PullRequest
2 голосов
/ 01 апреля 2020

Я использую многопроцессорную библиотеку для запуска Process параллельно с основной. Я использую аргумент target при инициализации, чтобы указать функцию для выполнения. Но функция не выполняется приблизительно 1 из 3 раз.

После копания в многопроцессорной библиотеке и использования обезьяньих патчей для отладки я обнаружил, что метод _bootstrap из BaseProcess (Process класс наследует от BaseProcess), который должен вызывать функцию, указанную в целевых параметрах при инициализации, не вызывался при вызове метода start() Процесса.

Поскольку моя ОС - Ubuntu 18.04, метод по умолчанию для запуска процесса - fork. Таким образом, Popen, используемый для запуска процесса, находится в файле popen_fork.py многопроцессорной библиотеки. И в этом классе Popen метод _launch вызывает os.fork(), а затем вызывает метод _bootstrap Процесса.

С помощью патча обезьяны я обнаружил, что код, который должен выполняться в дочернем процессе, вообще не выполняется, и поэтому функция, указанная в параметре target при инициализации процесса, не была выполняется при вызове метода start().

Невозможно воспроизвести проблему в более простой среде, чем та, над которой я работаю. Но вот код, который представляет то, что я делаю, и в чем моя проблема:

import time
from multiprocessing import Process
from multiprocessing.managers import BaseManager


class A:

    def __init__(self, manager):
        # manager is an object created by registering it in
        # multiprocessing.managers.BaseManager, so it is made for interprocess
        # communication
        self.manager = manager

        self.p = Process(target=self.process_method, args=(self.manager, ))

    def start(self):
        self.p.start()

    def process_method(self, manager):
        # This is the method that is not executed 2 out of 3 times
        print("(A.process_method) Entering method")
        c = 0
        while True:
            print(f"(A.process_method) Sending message : c = {c}")
            manager.on_reception(f"c = {c}")
            time.sleep(5)


class Manager:

    def __init__(self):
        self.msg = None
        self.unread_msg = False

    def on_reception(self, msg):
        self.msg = msg
        self.unread_msg = True

    def get_last_msg(self):
        if self.unread_msg:
            self.unread_msg = False
            return self.msg
        else:
            return None



if __name__ == "__main__":
    BaseManager.register("Manager", Manager)
    bm = BaseManager()
    bm.start()
    manager = bm.Manager()

    a = A(manager)
    a.start()
    while True:
        msg = manager.get_last_msg()
        if msg is not None:
            print(msg)

Метод, который должен выполняться каждый раз, - A.process_method. В этом примере это выполняется каждый раз, но в моей среде это не так.

У кого-нибудь когда-нибудь была эта проблема и знает, как ее исправить?

1 Ответ

1 голос
/ 03 апреля 2020

После копания я обнаружил, что сервер flask был запущен в потоке, а не в процессе. Я изменил его для запуска в процессе вместо потока, и теперь все работает так, как и должно.

И Flask, и мой Процесс используют пакет ведения журнала. И это может вызвать тупик при запуске нового процесса .

...