Почему мой собственный семафор на основе менеджера остается заблокированным навсегда - PullRequest
0 голосов
/ 17 июня 2020

Я написал собственный семафор на основе класса multiprocessing.Manager, потому что хочу использовать его для нескольких multiprocessing.Process - args. он работает хорошо, пока я тестирую его без multiprocessing.Manager в классе multiprocessing.Thread, но поскольку Lock нельзя мариновать, я решил переписать его, используя Manager.Lock(). когда основной поток (который отвечает за создание других потоков и предоставление новых данных) ожидает выполнения функции accrue() после накопления value времени, он не проснется, даже если потоки-потребители вызовут функцию release() после того, как их потреблял то, что предоставил основной поток.

Есть идеи, почему?

Спасибо

from multiprocessing import Manager
from multiprocessing.managers import ValueProxy
from threading import Event, Condition, Lock
from time import monotonic as _time


class JBSemaphore:

    def __init__(self, value=1, manager: Manager = None):
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
        self._cond = manager.Condition() if manager else Condition(Lock())
        self._value = manager.Value('i', value, False) if manager else value
        self._initial_value = value
        self._empty = manager.Event() if manager else Event()
        self._empty.set()

    def acquire(self, blocking=True, timeout=None):

        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        with self._cond:
            value = self._value.value if isinstance(self._value, ValueProxy) else self._value
            while value == 0:
                if not blocking:
                    break
                if timeout is not None:
                    if endtime is None:
                        endtime = _time() + timeout
                    else:
                        timeout = endtime - _time()
                        if timeout <= 0:
                            break
                self._cond.wait(timeout)
            else:
                self._empty.clear()
                if isinstance(self._value, ValueProxy):
                    self._value.value -= 1
                else:
                    self._value -= 1
                rc = True
        return rc

    __enter__ = acquire

    def join(self, timeout=None):
        self._empty.wait(timeout)

    def release(self):

        with self._cond:
            value = self._value.value if isinstance(self._value, ValueProxy) else self._value
            if value >= self._initial_value:
                raise ValueError("Semaphore released too many times")
            if value >= self._initial_value - 1:
                self._empty.set()

            if isinstance(self._value, ValueProxy):
                self._value.value += 1
            else:
                self._value += 1
            self._cond.notify()

    def acquired(self):
        with self._cond:
            if isinstance(self._value, ValueProxy):
                return max(0, self._initial_value - self._value.value)
            else:
                return max(0, self._initial_value - self._value)

manager = Manager()
output_queue = manager.Queue()
input_queue = manager.Queue(maxsize=10)
self._semaphore = JBSemaphore(10, manager)

tasks = [Process(target=self.worker,
                 args=(input_queue, output_queue, self._logger, self._semaphore))
         for _ in range(10)]
[task.start() for task in tasks]
def worker(input_queue, output_queue, logger: Logger, semaphore: JBSemaphore):

for _input in iter(input_queue.get, None):
if not _input:
    pass

...

except Exception as ex:
    logger.error(ex)

try:
    semaphore.release()
except Exception as ex:
    print(ex)

while True:
        try:
            self._semaphore.acquire()
            section = next(task_list)
            if not section :
                raise StopIteration

            input_queue.put({...})

        except Exception as ex:
            self._semaphore.release()
    ```


Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...