выглядит хорошо для меня.Семафоры не дают одновременным производителям и потребителям слишком много писать и читать, а блокировки не позволяют одновременным производителям или потребителям одновременно изменять индексы end
или start
.
Определенно необходимы два семафора.Вы можете снять одну из блокировок и использовать ее в get
и put
для защиты индексов start
и end
, которые не позволят потребителям и производителям одновременно получать доступ к очереди.( Реализация очереди CPython делает это.)
Я бы удалил атрибут size
в пользу len(self.buff)
, хотя и переименовал бы индексы start
и end
вread_index
и write_index
соответственно (и замки тоже).Кроме того, я думаю, что вы можете получить доступ к буферу без удержания блокировок (потому что сами списки являются потокобезопасными ):
def put(self, val):
self.open.acquire()
with self.write_lock:
index = self.write_index
self.write_index = (self.write_index + 1) % len(self.buff)
self.buff[index] = val
self.closed.release()
def get(self):
self.closed.acquire()
with self.read_lock:
index = self.read_index
self.read_index = (self.read_index + 1) % len(self.buff)
val = self.buff[index]
self.open.release()
return val
Вот небольшая тестовая программа, которую я использовал для воспроизведениявокруг:
def producer(queue, start, end, step):
for value in range(start, end, step):
queue.put(value)
print('Producer finished')
def consumer(queue, count, result, lock):
local_result = []
for _ in range(count):
local_result.append(queue.get())
with lock:
result.update(local_result)
print('Consumer finished')
def main():
value_count = 500000
producer_count = 50
consumer_count = 50
assert value_count % producer_count == 0
assert value_count % consumer_count == 0
queue = Queue(123)
result = set()
lock = Lock()
producers = [Thread(target=producer, args=(queue, i, value_count, producer_count)) for i in range(producer_count)]
consumers = [Thread(target=consumer, args=(queue, value_count // consumer_count, result, lock)) for _ in range(consumer_count)]
for p in producers:
p.start()
for c in consumers:
c.start()
for p in producers:
p.join()
for c in consumers:
c.join()
if len(result) != value_count:
raise ValueError('Result size is %d instead of %d' % (len(result), value_count))
if __name__ == '__main__':
main()