Я пытаюсь понять, как реализовать очередь с ограниченным размером для использования несколькими производителями и потребителями.У меня есть этот код:
Попытка 1:
class Q:
def __init__(self, size):
self.buff = [None]*size
self.end = 0
self.start = 0
self.size = size
self.curr_size = 0
self.open = Condition()
self.closed = Condition()
self.closed.acquire()
def put(self, val):
self.open.acquire()
while self.curr_size == self.size:
self.open.wait()
self.buff[self.end] = val
self.end = (self.end+1)%self.size
self.curr_size+=1
self.closed.notify()
self.closed.release()
def get(self):
self.closed.acquire()
while self.curr_size == 0:
self.closed.wait()
val = self.buff[self.start]
self.start = (self.start+1)%self.size
self.curr_size-=1
self.open.notify()
self.open.release()
return val
Могу ли я еще упростить это (например, использовать только одну условную переменную или мьютекс)?
ОБНОВЛЕНИЕ A : Приведенный выше пример кода позволяет помещать в очередь только один элемент и не более, пока не будет вызван вызов get, тратя впустую остаток буфера.Вот обновление кода, который пытается это исправить:
Попытка 2
class Q:
def __init__(self, size):
self.buff = [None]*size
self.end = 0
self.start = 0
self.size = size
self.curr_size = 0
self.mutex = Lock()
self.open = Condition(self.mutex)
self.closed = Condition(self.mutex)
def put(self, val):
self.mutex.acquire()
while self.curr_size == self.size:
self.open.wait()
self.buff[self.end] = val
self.end = (self.end+1)%self.size
self.curr_size+=1
self.closed.notify()
self.mutex.release()
def get(self):
self.mutex.acquire()
while self.curr_size == 0:
self.closed.wait()
val = self.buff[self.start]
self.start = (self.start+1)%self.size
self.curr_size-=1
self.open.notify()
self.mutex.release()
return val
ОБНОВЛЕНИЕ B : Здесь производитель блокирует потребителяи наоборот, есть ли способ, которым они могут быть параллельными, как в здесь с использованием семафоров ?
Попытка 3:
class Q:
def __init__(self, size):
self.buff = [None]*size
self.end = 1
self.start = 0
self.size = size
self.start_lock = Lock()
self.end_lock = Lock()
self.open = Condition(self.end_lock)
self.closed = Condition(self.start_lock)
self.start_lock.acquire()
def size_fn(self):
return self.end + self.size - self.start if self.end <= self.start else self.end - self.start
def put(self, val):
with self.end_lock:
while size_fn() == self.size:
self.open.wait()
self.buff[self.end-1] = val
self.end = (self.end+1)%self.size
self.closed.notify()
def get(self):
with self.start_lock:
while size_fn() == 0:
self.closed.wait()
val = self.buff[(self.start+1)%self.size]
self.start = (self.start+1)%self.size
self.open.notify()
return val
Здесь производители и потребители используют разные мьютексыблокировки, но может произойти переключение контекста во время функции size_fn () или сразу после нее, что приведет к ненужным ожиданиям (когда она пуста или заполнена).Но общая производительность кажется улучшенной, поскольку производители и потребители могут работать одновременно.
Обновление C
В приведенном выше коде есть аномалии.
Так вотЕще одна попытка: я сначала создал семафоры с обратной инженерией, которые должны быть построены с использованием условных переменных, следующим образом:
class Semaphore:
def __init__(self, size):
self.size = size
self.curr_size = 0
self.mutex = Lock()
self.cv = Condition(self.mutex)
def acquire(self):
with self.mutex:
while self.curr_size == self.size:
self.cv.wait()
self.curr_size += 1
def release(self):
with self.mutex:
if self.curr_size == 0:
raise Exception("Releasing semaphore more times than acquired!")
self.curr_size-=1
self.cv.notify()
Теперь я мог бы использовать эту идею, чтобы получить реализацию Q (рассматривается здесь с использованием семафоров )только с переменными условия:
Попытка 4 класс Q: def init (self, size): self.buff = [None] * size self.end = 0self.start = 0 self.size = size
self.start_lock = Lock()
self.end_lock = Lock()
self.open_mutex = Lock()
self.open_num = 0
self.open_cv = Condition(self.open_mutex)
self.closed_mutex = Lock()
self.closed_num = self.size
self.closed_cv = Condition(self.closed_mutex)
def put(self, val):
with self.open_mutex:
while self.open_num == self.size:
self.open_cv.wait()
self.open_num+=1
with self.end_lock:
self.buff[self.end] = val
self.end = (self.end+1)%self.size
with self.closed_mutex:
self.closed_num-=1
self.closed_cv.notify()
def get(self):
with self.closed_mutex:
while self.closed_num == self.size:
self.closed_cv.wait()
self.closed_num+=1
with self.start_lock:
val = self.buff[self.start]
self.start = (self.start+1)%self.size
with self.open_mutex:
self.open_num-=1
self.open_cv.notify()
return val
Сообщите мне, если есть проблемы в вышеупомянутой реализации.