multiprocessing.Queue не работает как переменная экземпляра в классе? - PullRequest
0 голосов
/ 02 декабря 2018

Я хочу заключить многопроцессорную задачу в класс.И управляющие, и рабочие функции являются членами класса.Рабочие работают с использованием Pool.map_async(), поэтому результаты могут быть обработаны, пока другие рабочие еще работают.Результаты для обработки сохраняются в multiprocessing.Queue.Когда Очередь является переменной экземпляра, она не работает, тогда как глобальная переменная или переменная класса работает.

Пример:

import multiprocessing 
class A():
    # Queue as instance variable
    def __init__(self):
        self.qout = multiprocessing.Queue()
    def worker(self,x):
        self.qout.put(x*x)   
    def process(self):
        values = range(10)
        with multiprocessing.Pool() as pool:
            res = pool.map_async(self.worker,values)        
            while (not self.qout.empty() or
                not res.ready()):
                val = self.qout.get()
                print(val)

qoutB = multiprocessing.Queue()
class B():
    # Queue as global variable
    def __init__(self):
        pass   
    def worker(self,x):
        qoutB.put(x*x)       
    def process(self):
        values = range(10)       
        with multiprocessing.Pool() as pool:
            res = pool.map_async(self.worker,values)           
            while (not qoutB.empty() or
                not res.ready()):
                val = qoutB.get()
                print(val)

class C():
    # Queue as Class variable
    qout = multiprocessing.Queue()
    def __init__(self):
        pass
    def worker(self,x):
        self.qout.put(x*x)   
    def process(self):
        values = range(10)
        with multiprocessing.Pool() as pool:
            res = pool.map_async(self.worker,values)        
            while (not self.qout.empty() or
                not res.ready()):
                val = self.qout.get()
                print(val)  

Теперь, когда вы вызываете класс следующим образом (поместите его ниже определения классов)

a=A()
a.process()

не работает (вероятно, перестает ждать на self.qout.get(), но

a=B()
a.process()

и

a=C()
a.process()

работает (печатаетрезультаты). Почему?

Я не нашел соответствующей информации в документации Python . Я не пытался передать очередь в качестве аргумента, но это особенность, которая должна бытьскрыто от пользователя.

Опция B должна быть исключена, C не идеален, поскольку очередь будет разделена между всеми экземплярами класса.

Примечание : Это протестировано в Linux (Debian, Python 3.5 из репозитория).

Ответы [ 2 ]

0 голосов
/ 02 декабря 2018

Опять же, это не ответ на ваш вопрос.Однако я публикую это, потому что это делает вопрос спорным, потому что вам не нужно явно создавать и использовать multiprocessing.Queue, чтобы сделать что-то подобное.

Вместо этого рассмотрите возможность использования concurrent.futures.ProcessPoolExecutor для выполнения задачи.

Например:

import concurrent.futures

class A_Prime():
    def __init__(self):
        pass

    def worker(self, x):
        return x*x

    def process(self):
        with concurrent.futures.ProcessPoolExecutor() as executor:
            classname = type(self).__name__
            print(classname, '- calling executor.map')
            res = [value for value in executor.map(self.worker, range(10))]
            print(classname, '- executor.map finished')
            print('  result:', res)


if __name__ == '__main__':
    test = A_Prime()
    test.process()
    print('done')

Вывод:

A_Prime - calling executor.map
A_Prime - executor.map finished
  result: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
done
0 голосов
/ 02 декабря 2018
Алгоритм

SO дал мне интересные подсказки, которые я не смог найти раньше.

Исходя из этого ответа , очереди не могут быть переданы в качестве аргумента функциям, которые открывают новые процессы,потому что очереди не могут быть засолены.И это то, что вообще self.function() делает: это эквивалентно function(self).В случае класса A очередь пытаются передать работникам;где, как и в B и C, это не так и живет более или менее независимо от процесса

Те же рассуждения заключают из этот вопрос и ответы .Само собой разумеется, что manager.Queue здесь также не работает.

Неудачное тестирование MCVE

Это, вероятно, из-за различных методов запуска по умолчанию multiprocessing( см. Документы )

...