Я написал собственный семафор на основе класса multiprocessing.Manager
, потому что хочу использовать его для нескольких multiprocessing.Process
- args
. он работает хорошо, пока я тестирую его без multiprocessing.Manager
в классе multiprocessing.Thread
, но поскольку Lock
нельзя мариновать, я решил переписать его, используя Manager.Lock()
. когда основной поток (который отвечает за создание других потоков и предоставление новых данных) ожидает выполнения функции accrue()
после накопления value
времени, он не проснется, даже если потоки-потребители вызовут функцию release()
после того, как их потреблял то, что предоставил основной поток.
Есть идеи, почему?
Спасибо
from multiprocessing import Manager
from multiprocessing.managers import ValueProxy
from threading import Event, Condition, Lock
from time import monotonic as _time
class JBSemaphore:
def __init__(self, value=1, manager: Manager = None):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = manager.Condition() if manager else Condition(Lock())
self._value = manager.Value('i', value, False) if manager else value
self._initial_value = value
self._empty = manager.Event() if manager else Event()
self._empty.set()
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
value = self._value.value if isinstance(self._value, ValueProxy) else self._value
while value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._empty.clear()
if isinstance(self._value, ValueProxy):
self._value.value -= 1
else:
self._value -= 1
rc = True
return rc
__enter__ = acquire
def join(self, timeout=None):
self._empty.wait(timeout)
def release(self):
with self._cond:
value = self._value.value if isinstance(self._value, ValueProxy) else self._value
if value >= self._initial_value:
raise ValueError("Semaphore released too many times")
if value >= self._initial_value - 1:
self._empty.set()
if isinstance(self._value, ValueProxy):
self._value.value += 1
else:
self._value += 1
self._cond.notify()
def acquired(self):
with self._cond:
if isinstance(self._value, ValueProxy):
return max(0, self._initial_value - self._value.value)
else:
return max(0, self._initial_value - self._value)
manager = Manager()
output_queue = manager.Queue()
input_queue = manager.Queue(maxsize=10)
self._semaphore = JBSemaphore(10, manager)
tasks = [Process(target=self.worker,
args=(input_queue, output_queue, self._logger, self._semaphore))
for _ in range(10)]
[task.start() for task in tasks]
def worker(input_queue, output_queue, logger: Logger, semaphore: JBSemaphore):
for _input in iter(input_queue.get, None):
if not _input:
pass
...
except Exception as ex:
logger.error(ex)
try:
semaphore.release()
except Exception as ex:
print(ex)
while True:
try:
self._semaphore.acquire()
section = next(task_list)
if not section :
raise StopIteration
input_queue.put({...})
except Exception as ex:
self._semaphore.release()
```