Проблема в том, что потребитель никогда не выходит, он просто зависает, ничего не делая. Код должен был работать так:
Очередь создана, и в нее переданы некоторые данные задачи. Для обработки данных создается определенное количество потребителей. Когда потребитель узнает, что очередь пуста, потому что есть шанс, что другой потребитель может что-то поместить в очередь, он не может уйти, однако в списке consumers_finished
он может указать, что у него нет рабочих мест. Цикл потребителей продолжается до тех пор, пока каждый работник не укажет, что завершил работу. Неизвестно, сколько работы будет, потому что потребители ставят задачи в очередь. Я читал кое-что об этом, но не было ясно, могут ли процессы зависать вечно, если они кормятся сами.
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, results, consumers_finished):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.results = results
self.consumers_finished = consumers_finished
def run(self):
while not all(flag for flag in self.consumers_finished.values()):
task_data = self.task_queue.get()
if not task_data:
self.consumers_finished[self.name] = True
continue
self.consumers_finished[self.name] = False
task_result = self.do_some_processing(task_data)
self.task_queue.put(task_result)
class Starter(object):
def start(self):
manager = multiprocessing.Manager()
task_queue = multiprocessing.Queue()
results = manager.list()
consumers_finished = manager.dict()
consumers = [Consumer(task_queue, results, consumers_finished) for i in range(self.consumers_count)]
for consumer in consumers:
consumers_finished[consumer.name] = False
consumer.start()
task_queue.put(task_data)
for consumer in consumers: consumer.join()
return results