Самый простой способ разделить класс между всеми процессами в многопроцессорности - PullRequest
0 голосов
/ 28 мая 2020

У меня есть класс, который запускает несколько потоков при инициализации. Первоначально я использовал многопоточность, но на собственном опыте убедился, насколько медленным это может быть. Когда я исследовал это, мне показалось, что многопроцессорность будет быстрее, потому что она фактически использует несколько ядер. Единственная трудность заключается в том, что он не разделяет ценности автоматически. Как я могу использовать следующий код self для всех процессов?

В идеале, он также должен совместно использоваться всеми процессами вне класса. Кроме того, я бы предпочел разделить весь класс, чем каждое отдельное значение, если это возможно.

import multiprocessing as mp
from time import sleep

class ThreadedClass:
    def __init__(self):
        self.var = 0
        #Here is where I would want to tell multiprocessing to share 'self'
        change_var = mp.Process(target=self.change_var, args=())
        print_var = mp.Process(target=self.print_var, args=())
        change_var.start()
        sleep(0.5)
        print_var.start()

    def change_var(self):
        while True:
            self.var += 1
            print("Changed var to ", self.var)
            sleep(1)

    def print_var(self):
        while True:
            print("Printing var: ", self.var)
            sleep(1)

ThreadedClass()

Я также включил вывод приведенного выше кода ниже:

Changed var to  1  
Printing var:  0  
Changed var to  2  
Printing var:  0 
Changed var to  3  
Printing var:  0  
Changed var to  4  
Printing var:  0 
Changed var to  5  
Printing var:  0  
Changed var to  6  
Printing var:  0 
Changed var to  7  
Printing var:  0  
Changed var to  8  
Printing var:  0 
Changed var to  9  
Printing var:  0  
Changed var to  10

Заранее спасибо.

1 Ответ

1 голос
/ 28 мая 2020

Прежде всего, многопроцессорность означает, что вы создаете подпроцессы. Это означает, что в целом у них есть свое место в памяти и они не разговаривают друг с другом. Для ясности: когда вы запускаете новый многопроцессорный поток, python копирует все ваши глобальные переменные в этот поток, а затем запускает этот поток отдельно от всего остального. Итак, когда вы создали свои два процесса, change_var и print_var, каждый из них получил копию self, а поскольку это две копии self, ни один из них не разговаривает с каждым. Один поток обновляет свою собственную копию и выдает подсчитываемые ответы, другой не обновляет себя. Вы можете легко проверить это самостоятельно:

import multiprocessing as mp
LIST = [] # This list is in parent process. 
def update(item):
    LIST.append(item)
p = mp.Process(target=update, args=(5,)) # Copies LIST, update, and anything else that is global. 
p.start()
p.join()
# The LIST in the sub-process is cleaned up in memory when the process ends. 
print(LIST) # The LIST in the parent process is not updated. 

Было бы очень опасно, если бы разные процессы обновляли переменные друг друга, когда они пытались обрабатывать их; следовательно, естественно, чтобы изолировать их (и предотвратить «ошибки сегментации»), все пространство имен копируется. Если вы хотите, чтобы подпроцессы общались друг с другом, вам необходимо связаться с менеджером и Очередью, которые предназначены для этого.

Я лично рекомендую вместо этого писать свой код вокруг таких вещей, как Pool (). Очень чисто, введите массив, верните массив, готово. Но если вы хотите go в кроличью нору, вот что я прочитал на веб-сайте, посвященном многопроцессорной обработке.

import multiprocessing as mp

def f(queue):
    queue.put(['stuff',15])
def g(queue):
    queue.put(['other thing'])

queue = mp.Queue()
p = mp.Process(target=f,args=(queue,))
q = mp.Process(target=g,args=(queue,))
p.start()
q.start()
for _ in range(2):
    print(queue.get())
p.join()
q.join()

Основная идея заключается в том, что очередь не копируется, а вместо этого позволяет оставлять вещи в очереди. Когда вы запускаете queue.get (), он ждет, пока в очереди не будет получено что-то, что было оставлено каким-то другим процессом. queue.get () блокируется и ждет. Это означает, что вы можете заставить один процесс читать содержимое другого процесса, например:

import multiprocessing as mp

def f(queue):
    obj = queue.get() # Blocks this sub-process until something shows up. 
    if obj:
        print('Something was in the queue from some other process.')
        print(obj)
def g(queue):
    queue.put(['leaving information here in queue'])

queue = mp.Queue()
p = mp.Process(target=f,args=(queue,)) 
q = mp.Process(target=g,args=(queue,))
p.start()

Это довольно круто, поэтому я рекомендую подождать здесь секунду, чтобы подумать о том, что ожидает обработки. Затем запустите процесс q.

q.start()

Обратите внимание, что p не дошел до завершения sh обработки, пока не был запущен q. Это потому, что Очередь заблокирована и ждала, чтобы что-то появилось.

# clean up 
p.join()
q.join()

Подробнее см .: https://docs.python.org/3.4/library/multiprocessing.html?highlight=process#multiprocessing .Queue

...