Продюсер Потребитель, использующий семафоры и мьютексы в Python - PullRequest
2 голосов
/ 21 сентября 2019

Я пытаюсь понять, как реализовать очередь с ограниченным размером буфера, которую могут использовать несколько производителей и потребителей, использующих семафоры Python.Вот моя реализация:

class Q:

    def __init__(self, size):
        self.buff = [None]*size
        self.end = 0
        self.start = 0
        self.size = size
        self.end_lock = Lock()  # protect end from race across multiple producers
        self.start_lock = Lock()  # protect start from race across multiple consumers
        self.open = Semaphore(size)  # block till there's space to produce
        self.closed = Semaphore(size) # block till there's item to consume
        for _ in range(size):  # initialize with all closed acquired so that consumer is blocked
            self.closed.acquire()

    def put(self, val):
        self.open.acquire()
        with self.end_lock:
            self.buff[self.end] = val
            self.end = (self.end+1)%self.size
        self.closed.release()

    def get(self):
        self.closed.acquire()
        with self.start_lock:
            val = self.buff[(self.start)%self.size]
            self.start = (self.start+1)%self.size
        self.open.release()
        return val

Эта реализация без ошибок?Можно ли еще упростить использование меньшего числа мьютексов / семафоров?

1 Ответ

1 голос
/ 21 сентября 2019

выглядит хорошо для меня.Семафоры не дают одновременным производителям и потребителям слишком много писать и читать, а блокировки не позволяют одновременным производителям или потребителям одновременно изменять индексы end или start.

Определенно необходимы два семафора.Вы можете снять одну из блокировок и использовать ее в get и put для защиты индексов start и end, которые не позволят потребителям и производителям одновременно получать доступ к очереди.( Реализация очереди CPython делает это.)


Я бы удалил атрибут size в пользу len(self.buff), хотя и переименовал бы индексы start и end вread_index и write_index соответственно (и замки тоже).Кроме того, я думаю, что вы можете получить доступ к буферу без удержания блокировок (потому что сами списки являются потокобезопасными ):

    def put(self, val):
        self.open.acquire()
        with self.write_lock:
            index = self.write_index
            self.write_index = (self.write_index + 1) % len(self.buff)
        self.buff[index] = val
        self.closed.release()

    def get(self):
        self.closed.acquire()
        with self.read_lock:
            index = self.read_index
            self.read_index = (self.read_index + 1) % len(self.buff)
        val = self.buff[index]
        self.open.release()
        return val

Вот небольшая тестовая программа, которую я использовал для воспроизведениявокруг:

def producer(queue, start, end, step):
    for value in range(start, end, step):
        queue.put(value)
    print('Producer finished')


def consumer(queue, count, result, lock):
    local_result = []
    for _ in range(count):
        local_result.append(queue.get())
    with lock:
        result.update(local_result)
    print('Consumer finished')


def main():
    value_count = 500000
    producer_count = 50
    consumer_count = 50
    assert value_count % producer_count == 0
    assert value_count % consumer_count == 0

    queue = Queue(123)
    result = set()
    lock = Lock()
    producers = [Thread(target=producer, args=(queue, i, value_count, producer_count)) for i in range(producer_count)]
    consumers = [Thread(target=consumer, args=(queue, value_count // consumer_count, result, lock)) for _ in range(consumer_count)]

    for p in producers:
        p.start()
    for c in consumers:
        c.start()

    for p in producers:
        p.join()
    for c in consumers:
        c.join()

    if len(result) != value_count:
        raise ValueError('Result size is %d instead of %d' % (len(result), value_count))


if __name__ == '__main__':
    main()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...