Я пытаюсь выполнить параллельные вычисления, чтобы ускорить цикл for (я уже использую itertools, мне нужно больше скорости, поскольку я выполняю цикл for несколько раз). Я новичок в многопроцессорной. Я проверил несколько вопросов о переполнении стека и попытался решить свою проблему, однако у меня все еще есть некоторые трудности. Я создаю общие переменные (self.A, self.B, self.C), чтобы многопроцессорная обработка выполнялась эффективно. Тем не менее, я думаю, что я делаю что-то не так, поскольку, когда я проверяю свои переменные после вычисления, я вижу, что они не изменились. Мой код немного сложен, поэтому приведенный ниже код является примером кода, который демонстрирует мою проблему. Спасибо за вашу помощь!
import numpy as np
from multiprocessing import Process, Array, Pool
from ctypes import c_double
import itertools
class F():
def __init__(self, num_process=4):
self.num_process = num_process
self.idx = list(itertools.product(range(5), range(10)))
self.A = np.zeros((5, 10))
if self.num_process > 1:
self.A = np.frombuffer(Array(c_double, self.A.flat, lock=False))
self.A.resize(5,10)
def solve(self):
self.B = np.zeros((10, 5, 10))
self.C = np.zeros((10, 5, 10))
if self.num_process > 1:
self.B = np.frombuffer(Array(c_double, self.B.flat, lock=False))
self.B.resize(10,5,10)
self.C = np.frombuffer(Array(c_double, self.C.flat, lock=False))
self.C.resize(10,5,10)
print('Before=',self.A,self.B,self.C)
for i in range(10):
if self.num_process == 1:
for (k,l) in self.idx:
self.B[i,k,l]=1
self.C[i,k,l]=1
else:
workers = []
for worker_num in range(self.num_process):
worker = Process(target=F.update,
args=(i, worker_num, self.num_process,
self.idx, self.A, self.B, self.C))
workers.append(worker)
worker.start()
for worker in workers:
worker.join()
print('After=',self.A,self.B,self.C)
@staticmethod
def update( i, worker_num, num_process, idx, A, B, C):
start_num = int(len(idx) * (worker_num/num_process))
end_num = int(len(idx) * ((worker_num+1)/num_process))
for j in range(start_num, end_num):
k,l = idx[j]
B[i,k,l]=min(2,A[k,l])
C[i,k,l]=2
if __name__ == '__main__':
var=F()
var.solve()
Когда я печатаю свои переменные после вычисления, я вижу, что они не изменились.
UPDATE
Я смог исправить свой код и выполнить многопроцессорную обработку, используя приведенный ниже код. Моя ошибка, как указал Рики Ким, заключалась в том, что я не создавал общие переменные. Приведенный ниже код достигает этого, однако он все еще медленнее, чем при использовании одного процесса (конечно, для выполнения тех же операций). Любые идеи о том, как сделать многопроцессорность быстрее и эффективнее. Спасибо!
import numpy as np
import multiprocessing as mp
from multiprocessing import Process, Array, Pool
from ctypes import c_double
import itertools
class F():
def __init__(self, num_process=4):
self.num_process = num_process
self.idx = list(itertools.product(range(5), range(10)))
self.A = np.zeros((5, 10))
def solve(self):
B_shared = Array(c_double, 10*5*10)
C_shared = Array(c_double, 10*5*10)
self.B = np.frombuffer(B_shared.get_obj())
self.B = self.B.reshape(10,5,10)
self.C = np.frombuffer(C_shared.get_obj())
self.C = self.C.reshape(10,5,10)
print('Before=',self.A,self.B,self.C)
for i in range(10):
if self.num_process == 1:
# perform some expensive operation
for (k,l) in self.idx:
self.B[i,k,l]=1
self.C[i,k,l]=1
else:
workers = []
for worker_num in range(self.num_process):
worker = Process(target=self.update,
args=(i, worker_num, self.num_process, B_shared, C_shared))
workers.append(worker)
worker.start()
for worker in workers:
worker.join()
print('After=',self.A,self.B,self.C)
def update(self, i, worker_num, num_process, B_shared, C_shared):
B = np.frombuffer(B_shared.get_obj())
B = B.reshape((10,5,10))
C = np.frombuffer(B_shared.get_obj())
C = C.reshape((10,5,10))
start_num = int(len(self.idx) * (worker_num/num_process))
end_num = int(len(self.idx) * ((worker_num+1)/num_process))
for j in range(start_num, end_num):
# perform some expensive operation
k,l = self.idx[j]
B[i,k,l]=min(2,self.A[k,l])
C[i,k,l]=2
if __name__ == '__main__':
mp.freeze_support()
var=F()
var.solve()