Основная проблема заключается в том, что ваш код состоит в том, что вы не проверяете, что список не пуст / заполнен после поток был уведомлен.Это может быть проблемой в следующей ситуации:
c1
и c2
- потоки потребителя, p1
- поток производителя.В начале очередь пуста.c1
не активен (в настоящее время находится в последней строке time.sleep...
), а c2
ожидает уведомления (в строке while item_ok.wait():
.
p1
добавляет элемент в очередь извонки item_ok.notify()
c1
заканчивают ожидание и получают блокировку c2
получает уведомление и пытается получить блокировку c1
потребляет элемент изОчередь и снимает блокировку c2
получает блокировку и пытается выскочить из пустой очереди
Решение
Вместо вызова .wait()
вв то время как условие (которое бессмысленно, потому что оно всегда возвращает None
на Python 2 и всегда True
на Python 3.2+, см. здесь ), вызовите .wait()
в теле цикла while и поместитеусловие, не является ли очередь заполненной / пустой в цикле while условие:
while not queue:
print('queue is empty, stop consuming')
item_ok.wait()
print('trying again')
Используя этот подход (который также используется в документах, связанных выше), поток проверяет, не по-прежнему ли очередьпустой / полный после того, как он проснулся и приобрел замок.on больше не выполняется (поскольку между ними был выполнен другой поток), поток снова ожидает выполнения условия.
Кстати, разница между python 2 и 3, описанная выше, также является причиной, по которой ваша программаведет себя по-разному на двух версиях.Это задокументированное поведение, а не ошибка в реализации.
Фиксированный код (который нормально работал на моей машине в течение последних 30 минут) для потоков производителей и потребителей выглядит следующим образом (я удалил цвета, потому чтоЯ не хотел устанавливать пакет):
class ProducerThread(Thread):
def run(self):
global queue
while True:
qlock.acquire()
while len(queue) >= CAPACITY:
print('queue is full, stop producing')
space_ok.wait()
print('trying again')
item = chr(ord('A')+randint(0,25))
print('['+' '.join(queue)+'] <= '+item)
queue.append(item)
item_ok.notify()
qlock.release()
time.sleep((random()+0.2)/1.2)
class ConsumerThread(Thread):
def run(self):
global queue
while True:
qlock.acquire()
while not queue:
print('queue is empty, stop consuming')
item_ok.wait()
print('trying again')
item = queue.pop(0)
print(item+' <= ['+' '.join(queue)+']')
space_ok.notify()
qlock.release()
time.sleep((random()+0.2)/1.2)
Бонус
Вы упомянули, что программа не может быть закрыта с помощью Ctrl-C
(KeyboardInterrupt).Чтобы это исправить, вы можете сделать потоки "демонами", что означает, что они выходят, как только заканчивается основной поток.Используя приведенный выше код, Ctrl-C
прекрасно работает для завершения программы:
ProducerThread(name='red', daemon=True).start()
ProducerThread(name='green', daemon=True).start()
ProducerThread(name='blue', daemon=True).start()
ConsumerThread(name='cyan', daemon=True).start()
ConsumerThread(name='magenta', daemon=True).start()
ConsumerThread(name='yellow', daemon=True).start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Exiting")
Решает ли это вашу проблему?Пожалуйста, прокомментируйте ниже.