многопроцессорный самонаводящийся потребитель Python блокирует навсегда - PullRequest
1 голос
/ 31 октября 2011

Проблема в том, что потребитель никогда не выходит, он просто зависает, ничего не делая. Код должен был работать так:

Очередь создана, и в нее переданы некоторые данные задачи. Для обработки данных создается определенное количество потребителей. Когда потребитель узнает, что очередь пуста, потому что есть шанс, что другой потребитель может что-то поместить в очередь, он не может уйти, однако в списке consumers_finished он может указать, что у него нет рабочих мест. Цикл потребителей продолжается до тех пор, пока каждый работник не укажет, что завершил работу. Неизвестно, сколько работы будет, потому что потребители ставят задачи в очередь. Я читал кое-что об этом, но не было ясно, могут ли процессы зависать вечно, если они кормятся сами.

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, results, consumers_finished):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.results = results
        self.consumers_finished = consumers_finished

    def run(self):
        while not all(flag for flag in self.consumers_finished.values()):
            task_data = self.task_queue.get()
            if not task_data:
                self.consumers_finished[self.name] = True
                continue

            self.consumers_finished[self.name] = False
            task_result = self.do_some_processing(task_data)
            self.task_queue.put(task_result)


class Starter(object):

    def start(self):
        manager = multiprocessing.Manager()
        task_queue = multiprocessing.Queue()
        results = manager.list()
        consumers_finished = manager.dict()

        consumers = [Consumer(task_queue, results, consumers_finished) for i in range(self.consumers_count)]

        for consumer in consumers:
            consumers_finished[consumer.name] = False
            consumer.start()

        task_queue.put(task_data)

        for consumer in consumers: consumer.join()

        return results

1 Ответ

2 голосов
/ 01 ноября 2011

Похоже, хороший сон действительно помогает, обновленный ум может сделать гораздо больше ... В любом случае, я нашел решение после изучения документации по python.

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, results, consumers_finished):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.results = results
        self.consumers_finished = consumers_finished

    def run(self):
        while not all(flag for flag in self.consumers_finished.values()):
            try:
                task = self.todo_queue.get(False)
                self.consumers_finished[self.name] = False
            except QueueEmpty:
                self.consumers_stopped[self.name] = True
                continue

            task_result = self.do_some_processing(task_data)
            self.task_queue.put(task_result)


class Starter(object):

    def start(self):
        manager = multiprocessing.Manager()
        task_queue = manager.Queue()
        results = manager.list()
        consumers_finished = manager.dict()

        consumers = [Consumer(task_queue, results, consumers_finished) for i in range(self.consumers_count)]

        for consumer in consumers:
            consumers_finished[consumer.name] = False
            consumer.start()

        task_queue.put(task_data)

        for consumer in consumers: consumer.join()

        return results

Это часть документации по python,Я думаю, что это объясняет мою проблему:

Предупреждение Как упоминалось выше, если дочерний процесс поместил элементы в очередь (и не использовал JoinableQueue.cancel_join_thread ()), тогда этот процесс не завершитсяпока все буферизованные предметы не будут сброшеныЭто означает, что если вы попытаетесь присоединиться к этому процессу, вы можете зайти в тупик, если не уверены, что все элементы, помещенные в очередь, были использованы.Точно так же, если дочерний процесс не является демоническим, родительский процесс может зависать при выходе, когда он пытается присоединиться ко всем своим недемоническим дочерним процессам. Обратите внимание, что в очереди, созданной с помощью менеджера, такой проблемы нет. См. Рекомендации по программированию.

Итак, я просто изменил очередь, теперь она создается менеджером, и взадачи метода запуска потребителя по-другому берутся из очереди, см. код.

...