Multiprocessing.Queue взаимоблокировка при порождении процесса - PullRequest
0 голосов
/ 14 апреля 2020

Позвольте мне начать с спасибо, что нашли время для прочтения этого. Прежде всего, я хотел бы начать делиться своим кодом, видите удар. Это около 200 строк кода, но большинство строк являются определениями свойств.

Некоторый контекст: я просто пытаюсь лучше понять многопроцессорность, поэтому я сделал небольшой проект, чтобы попытаться запустить несколько процессов которые все разделяют одну очередь ввода и вывода. Это работает, но иногда оно блокируется сразу после вызова метода scheduler.start().

Я использую Python 3.8 на MacOS Catalina.

# coding=utf-8
from abc import abstractmethod, ABC
from multiprocessing import Process, Queue
from time import sleep
from typing import Optional, List, Dict, Union, Any
from uuid import uuid4


class AbstractTask(ABC):
    def __init__(self, input_queue: Optional[Queue] = None, output_queue: Optional[Queue] = None, /):
        self._input_queue = input_queue
        self._output_queue = output_queue

    @property
    def input_queue(self):
        return self._input_queue

    @input_queue.setter
    def input_queue(self, value: Queue):
        if self._input_queue is None:
            self._input_queue = value

    @property
    def output_queue(self):
        return self._output_queue

    @output_queue.setter
    def output_queue(self, value: Queue):
        if self._output_queue is None:
            self._output_queue = value

    @abstractmethod
    def run(self):
        pass


class SimpleTask(AbstractTask):
    def __init__(self, input_queue: Optional[Queue] = None, output_queue: Optional[Queue] = None, /):
        super().__init__(input_queue, output_queue)

    def run(self):
        while True:
            event = self.input_queue.get()
            print(f"Process ({id(self)} got a new event: {event}")
            self.output_queue.put(event)


class Calculation(Process):
    def __init__(self, _input: Queue, _output: Queue, _task: AbstractTask = None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._input = _input
        self._output = _output
        self._task = _task

        # debugging:
        print(f"{self.pid=}")
        print(f"input_queue id: {id(self._input)}")
        print(f"output_queue id: {id(self._output)}")

    def run(self) -> None:
        if self._task is not None:
            self._task.input_queue = self._input
            self._task.output_queue = self._output
            self._task.run()


class AbstractWorker(ABC):
    def __init__(self):
        """
        Abstract implementation for the Worker class.

        """
        # attributes
        self._identifier = int(uuid4())
        self._status = None
        self.error: bool = False
        self.process: Optional[Process] = None

        # initialisation
        self.initialize()

    @property
    def identifier(self):
        return self._identifier

    @property
    def status(self):
        return self._status

    @status.setter
    def status(self, value):
        if value is not None:
            self.status = value

    @abstractmethod
    def initialize(self):
        pass

    def get_identifier(self):
        return self.identifier

    def get_status(self):
        return self.status

    def start(self, _input: Queue, _output: Queue, _task: AbstractTask):
        # need: a sharable input queue
        # need: a queue to put results
        self.process = Calculation(_input, _output, _task, daemon=True)
        self.process.start()


class Worker(AbstractWorker):
    def __init__(self):
        super().__init__()

    def initialize(self):
        print(f"Created new Worker with UUID: {self.identifier}")


class Scheduler:
    def __init__(self, shared_input: Queue, shared_output: Queue):

        # Attributes
        self._workers: Dict[int, AbstractWorker] = dict()
        self._tasks: List[AbstractTask] = []
        self._shared_input: Queue = shared_input
        self._shared_output: Queue = shared_output

    @property
    def shared_input(self):
        if self._shared_input is None:
            raise ValueError("attribute 'shared_input' is None, never given.")
        return self._shared_input

    @shared_input.setter
    def shared_input(self, value: Queue):
        if isinstance(value, Queue):
            self._shared_input = value
        else:
            raise ValueError("Attribute 'shared_queue' must be of type 'multiprocessing.Queue'.")

    @property
    def shared_output(self):
        if self._shared_output is None:
            raise ValueError("attribute 'shared_output' is None, never given.")
        return self._shared_output

    @shared_output.setter
    def shared_output(self, value: Queue):
        if isinstance(value, Queue):
            self._shared_output = value
        else:
            raise ValueError("Attribute 'shared_output' must be of type 'multiprocessing.Queue'.")

    @property
    def tasks(self):
        return self._tasks

    @tasks.setter
    def tasks(self, new_task_list: List[AbstractTask]):
        if all([isinstance(task, AbstractTask) for task in new_task_list]):
            self._tasks = new_task_list
        else:
            raise ValueError("Every item in provided task list must be based of 'AbstractTask'.")

    @property
    def workers(self):
        return self._workers

    @workers.setter
    def workers(self, new_worker_list: Dict[int, AbstractWorker]):
        if all([isinstance(worker, AbstractWorker) for worker in new_worker_list.values()]):
            def _check(_worker: AbstractWorker):
                if _worker.status == 'WORKING':
                    return True

            if any(_check(current_worker) for current_worker in self.workers.values()):
                raise RuntimeError("Trying to set new workers while current workers are still running!")

            self._workers = new_worker_list

    def subscribe(self, worker: AbstractWorker):
        self.workers[worker.identifier] = worker

    def status(self, identifier: Optional[int] = None) -> Union[str, Dict[int, str]]:
        if identifier is None:
            return {identifier: worker.status for (identifier, worker) in self.workers.items()}
        return self.workers[identifier].status

    def start(self, task: AbstractTask):
        for worker in self.workers.values():
            worker.start(self.shared_input, self.shared_output, task)

    def register(self, event: Any):
        if self.shared_input:
            self.shared_input.put(event, block=False)


def main():
    _input_queue = Queue()
    _output_queue = Queue()
    scheduler = Scheduler(_input_queue, _output_queue)

    w1 = Worker()
    w2 = Worker()
    w3 = Worker()
    w4 = Worker()

    _workers = [w1, w2, w3, w4]

    for _worker in _workers:
        scheduler.subscribe(_worker)

    task = SimpleTask()
    scheduler.start(task)
    sleep(0.1)  # Prevent deadlocking, something to do with Threading Lock and the bootstrapping time on the processes

    for i in range(10000):
        scheduler.register(str(i))

    while not scheduler.shared_output.empty():
        result = scheduler.shared_output.get()
        print(f"Got a new result on main process: {result}")


if __name__ == '__main__':
    main()

Я «решил» проблему, используя простой таймер сна менее чем на секунду, но, поскольку, на мой взгляд, это решение не изящно и некрасиво, я пошел и стал искать более элегантное решение. Я не нашел ни одного. Я думаю, что ошибка как-то связана с потоками, которые используют объекты Queue, похоже, что есть условие гонки, но нет способа проверить (что мне известно).

Для тех, кто интересуется KeyboardInterrupt traceback, который я получаю при запуске этого, я помещу это ниже здесь:

Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.8/3.8.1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/util.py", line 277, in _run_finalizers
    finalizer()
  File "/usr/local/Cellar/python@3.8/3.8.1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/util.py", line 201, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/local/Cellar/python@3.8/3.8.1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 195, in _finalize_join
    thread.join()
  File "/usr/local/Cellar/python@3.8/3.8.1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 1011, in join
    self._wait_for_tstate_lock()
  File "/usr/local/Cellar/python@3.8/3.8.1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

EDIT 1: Кажется, это проблема, когда разветвление дочерних процессов устанавливает блокировку для объектов Queue, только вопрос почему блокировка настолько медленная?

...