Позвольте мне начать с спасибо, что нашли время для прочтения этого. Прежде всего, я хотел бы начать делиться своим кодом, видите удар. Это около 200 строк кода, но большинство строк являются определениями свойств.
Некоторый контекст: я просто пытаюсь лучше понять многопроцессорность, поэтому я сделал небольшой проект, чтобы попытаться запустить несколько процессов которые все разделяют одну очередь ввода и вывода. Это работает, но иногда оно блокируется сразу после вызова метода scheduler.start()
.
Я использую Python 3.8 на MacOS Catalina.
# coding=utf-8
from abc import abstractmethod, ABC
from multiprocessing import Process, Queue
from time import sleep
from typing import Optional, List, Dict, Union, Any
from uuid import uuid4
class AbstractTask(ABC):
def __init__(self, input_queue: Optional[Queue] = None, output_queue: Optional[Queue] = None, /):
self._input_queue = input_queue
self._output_queue = output_queue
@property
def input_queue(self):
return self._input_queue
@input_queue.setter
def input_queue(self, value: Queue):
if self._input_queue is None:
self._input_queue = value
@property
def output_queue(self):
return self._output_queue
@output_queue.setter
def output_queue(self, value: Queue):
if self._output_queue is None:
self._output_queue = value
@abstractmethod
def run(self):
pass
class SimpleTask(AbstractTask):
def __init__(self, input_queue: Optional[Queue] = None, output_queue: Optional[Queue] = None, /):
super().__init__(input_queue, output_queue)
def run(self):
while True:
event = self.input_queue.get()
print(f"Process ({id(self)} got a new event: {event}")
self.output_queue.put(event)
class Calculation(Process):
def __init__(self, _input: Queue, _output: Queue, _task: AbstractTask = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self._input = _input
self._output = _output
self._task = _task
# debugging:
print(f"{self.pid=}")
print(f"input_queue id: {id(self._input)}")
print(f"output_queue id: {id(self._output)}")
def run(self) -> None:
if self._task is not None:
self._task.input_queue = self._input
self._task.output_queue = self._output
self._task.run()
class AbstractWorker(ABC):
def __init__(self):
"""
Abstract implementation for the Worker class.
"""
# attributes
self._identifier = int(uuid4())
self._status = None
self.error: bool = False
self.process: Optional[Process] = None
# initialisation
self.initialize()
@property
def identifier(self):
return self._identifier
@property
def status(self):
return self._status
@status.setter
def status(self, value):
if value is not None:
self.status = value
@abstractmethod
def initialize(self):
pass
def get_identifier(self):
return self.identifier
def get_status(self):
return self.status
def start(self, _input: Queue, _output: Queue, _task: AbstractTask):
# need: a sharable input queue
# need: a queue to put results
self.process = Calculation(_input, _output, _task, daemon=True)
self.process.start()
class Worker(AbstractWorker):
def __init__(self):
super().__init__()
def initialize(self):
print(f"Created new Worker with UUID: {self.identifier}")
class Scheduler:
def __init__(self, shared_input: Queue, shared_output: Queue):
# Attributes
self._workers: Dict[int, AbstractWorker] = dict()
self._tasks: List[AbstractTask] = []
self._shared_input: Queue = shared_input
self._shared_output: Queue = shared_output
@property
def shared_input(self):
if self._shared_input is None:
raise ValueError("attribute 'shared_input' is None, never given.")
return self._shared_input
@shared_input.setter
def shared_input(self, value: Queue):
if isinstance(value, Queue):
self._shared_input = value
else:
raise ValueError("Attribute 'shared_queue' must be of type 'multiprocessing.Queue'.")
@property
def shared_output(self):
if self._shared_output is None:
raise ValueError("attribute 'shared_output' is None, never given.")
return self._shared_output
@shared_output.setter
def shared_output(self, value: Queue):
if isinstance(value, Queue):
self._shared_output = value
else:
raise ValueError("Attribute 'shared_output' must be of type 'multiprocessing.Queue'.")
@property
def tasks(self):
return self._tasks
@tasks.setter
def tasks(self, new_task_list: List[AbstractTask]):
if all([isinstance(task, AbstractTask) for task in new_task_list]):
self._tasks = new_task_list
else:
raise ValueError("Every item in provided task list must be based of 'AbstractTask'.")
@property
def workers(self):
return self._workers
@workers.setter
def workers(self, new_worker_list: Dict[int, AbstractWorker]):
if all([isinstance(worker, AbstractWorker) for worker in new_worker_list.values()]):
def _check(_worker: AbstractWorker):
if _worker.status == 'WORKING':
return True
if any(_check(current_worker) for current_worker in self.workers.values()):
raise RuntimeError("Trying to set new workers while current workers are still running!")
self._workers = new_worker_list
def subscribe(self, worker: AbstractWorker):
self.workers[worker.identifier] = worker
def status(self, identifier: Optional[int] = None) -> Union[str, Dict[int, str]]:
if identifier is None:
return {identifier: worker.status for (identifier, worker) in self.workers.items()}
return self.workers[identifier].status
def start(self, task: AbstractTask):
for worker in self.workers.values():
worker.start(self.shared_input, self.shared_output, task)
def register(self, event: Any):
if self.shared_input:
self.shared_input.put(event, block=False)
def main():
_input_queue = Queue()
_output_queue = Queue()
scheduler = Scheduler(_input_queue, _output_queue)
w1 = Worker()
w2 = Worker()
w3 = Worker()
w4 = Worker()
_workers = [w1, w2, w3, w4]
for _worker in _workers:
scheduler.subscribe(_worker)
task = SimpleTask()
scheduler.start(task)
sleep(0.1) # Prevent deadlocking, something to do with Threading Lock and the bootstrapping time on the processes
for i in range(10000):
scheduler.register(str(i))
while not scheduler.shared_output.empty():
result = scheduler.shared_output.get()
print(f"Got a new result on main process: {result}")
if __name__ == '__main__':
main()
Я «решил» проблему, используя простой таймер сна менее чем на секунду, но, поскольку, на мой взгляд, это решение не изящно и некрасиво, я пошел и стал искать более элегантное решение. Я не нашел ни одного. Я думаю, что ошибка как-то связана с потоками, которые используют объекты Queue, похоже, что есть условие гонки, но нет способа проверить (что мне известно).
Для тех, кто интересуется KeyboardInterrupt traceback, который я получаю при запуске этого, я помещу это ниже здесь:
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.8/3.8.1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/util.py", line 277, in _run_finalizers
finalizer()
File "/usr/local/Cellar/python@3.8/3.8.1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/util.py", line 201, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/local/Cellar/python@3.8/3.8.1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 195, in _finalize_join
thread.join()
File "/usr/local/Cellar/python@3.8/3.8.1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 1011, in join
self._wait_for_tstate_lock()
File "/usr/local/Cellar/python@3.8/3.8.1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
EDIT 1: Кажется, это проблема, когда разветвление дочерних процессов устанавливает блокировку для объектов Queue, только вопрос почему блокировка настолько медленная?