Производители Потребители с очередью ограниченного размера, использующей переменные условия Python - PullRequest
0 голосов
/ 21 сентября 2019

Я пытаюсь понять, как реализовать очередь с ограниченным размером для использования несколькими производителями и потребителями.У меня есть этот код:

Попытка 1:

class Q:

    def __init__(self, size):
        self.buff = [None]*size
        self.end = 0
        self.start = 0
        self.size = size
        self.curr_size = 0
        self.open = Condition()
        self.closed = Condition()
        self.closed.acquire()

    def put(self, val):
        self.open.acquire()
        while self.curr_size == self.size:
            self.open.wait()
        self.buff[self.end] = val
        self.end = (self.end+1)%self.size
        self.curr_size+=1
        self.closed.notify()
        self.closed.release()

    def get(self):
        self.closed.acquire()
        while self.curr_size == 0:
            self.closed.wait()
        val = self.buff[self.start]
        self.start = (self.start+1)%self.size
        self.curr_size-=1
        self.open.notify()
        self.open.release()
        return val

Могу ли я еще упростить это (например, использовать только одну условную переменную или мьютекс)?

ОБНОВЛЕНИЕ A : Приведенный выше пример кода позволяет помещать в очередь только один элемент и не более, пока не будет вызван вызов get, тратя впустую остаток буфера.Вот обновление кода, который пытается это исправить:

Попытка 2

class Q:

    def __init__(self, size):
        self.buff = [None]*size
        self.end = 0
        self.start = 0
        self.size = size
        self.curr_size = 0
        self.mutex = Lock()
        self.open = Condition(self.mutex)
        self.closed = Condition(self.mutex)

    def put(self, val):
        self.mutex.acquire()
        while self.curr_size == self.size:
            self.open.wait()
        self.buff[self.end] = val
        self.end = (self.end+1)%self.size
        self.curr_size+=1
        self.closed.notify()
        self.mutex.release()

    def get(self):
        self.mutex.acquire()
        while self.curr_size == 0:
            self.closed.wait()
        val = self.buff[self.start]
        self.start = (self.start+1)%self.size
        self.curr_size-=1
        self.open.notify()
        self.mutex.release()
        return val

ОБНОВЛЕНИЕ B : Здесь производитель блокирует потребителяи наоборот, есть ли способ, которым они могут быть параллельными, как в здесь с использованием семафоров ?

Попытка 3:

class Q:

    def __init__(self, size):
        self.buff = [None]*size
        self.end = 1
        self.start = 0
        self.size = size
        self.start_lock = Lock()
        self.end_lock = Lock()
        self.open = Condition(self.end_lock)
        self.closed = Condition(self.start_lock)
        self.start_lock.acquire()

    def size_fn(self):
        return self.end + self.size - self.start if self.end <= self.start else self.end - self.start

    def put(self, val):
        with self.end_lock:
            while size_fn() == self.size:
                self.open.wait()
            self.buff[self.end-1] = val
            self.end = (self.end+1)%self.size
            self.closed.notify()

    def get(self):
        with self.start_lock:
            while size_fn() == 0:
                self.closed.wait()
            val = self.buff[(self.start+1)%self.size]
            self.start = (self.start+1)%self.size
            self.open.notify()
        return val

Здесь производители и потребители используют разные мьютексыблокировки, но может произойти переключение контекста во время функции size_fn () или сразу после нее, что приведет к ненужным ожиданиям (когда она пуста или заполнена).Но общая производительность кажется улучшенной, поскольку производители и потребители могут работать одновременно.

Обновление C

В приведенном выше коде есть аномалии.

Так вотЕще одна попытка: я сначала создал семафоры с обратной инженерией, которые должны быть построены с использованием условных переменных, следующим образом:

class Semaphore:
    def __init__(self, size):
        self.size = size
        self.curr_size = 0
        self.mutex = Lock()
        self.cv = Condition(self.mutex)

    def acquire(self):
        with self.mutex:
            while self.curr_size == self.size:
                self.cv.wait()
            self.curr_size += 1         

    def release(self):
        with self.mutex:
            if self.curr_size == 0:
                raise Exception("Releasing semaphore more times than acquired!")
            self.curr_size-=1
            self.cv.notify()

Теперь я мог бы использовать эту идею, чтобы получить реализацию Q (рассматривается здесь с использованием семафоров )только с переменными условия:

Попытка 4 класс Q: def init (self, size): self.buff = [None] * size self.end = 0self.start = 0 self.size = size

        self.start_lock = Lock()
        self.end_lock = Lock()

        self.open_mutex = Lock()
        self.open_num = 0
        self.open_cv = Condition(self.open_mutex)

        self.closed_mutex = Lock()
        self.closed_num = self.size
        self.closed_cv = Condition(self.closed_mutex)

    def put(self, val):

            with self.open_mutex:
                while self.open_num == self.size:
                    self.open_cv.wait()
                self.open_num+=1

            with self.end_lock:
                self.buff[self.end] = val
                self.end = (self.end+1)%self.size

            with self.closed_mutex:
                self.closed_num-=1
                self.closed_cv.notify()

    def get(self):
            with self.closed_mutex:
                while self.closed_num == self.size:
                    self.closed_cv.wait()
                self.closed_num+=1

            with self.start_lock:
                val = self.buff[self.start]
                self.start = (self.start+1)%self.size

            with self.open_mutex:
                self.open_num-=1
                self.open_cv.notify()
        return val

Сообщите мне, если есть проблемы в вышеупомянутой реализации.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...