У вас возникшая проблема из-за target
, который вы вызываете из пула. Это target
- функция со ссылкой на экземпляр Worker
.
Теперь вы правы, что __init__()
вызывается только дважды. Но помните, когда вы отправляете что-либо в процессы и из процессов, python сначала нужно будет обработать данные.
Итак, поскольку ваш target
равен self.work_internal()
, python должен обрабатывать экземпляр Worker()
каждый раз, когда вызывается imap. Это приводит к одной проблеме: self.data
копируется снова и снова.
Следующее является доказательством. Я просто добавил 1 «входной» оператор и зафиксировал время последнего вычисления времени.
import numpy as np
import multiprocessing as mp
import time
def work_standalone(args):
return 2
class Worker:
def __init__(self):
self.data = np.random.random(size=(10000, 10000))
# leave a trace whenever init is called
with open('rnd-%d' % np.random.randint(100), 'a') as f:
f.write('init called\n')
def work_internal(self, args):
return 2
def _run(self, target):
with mp.Pool() as pool:
tasks = [[idx] for idx in range(16)]
result = pool.imap(target, tasks)
input("Wait for analysis")
for res in result:
pass
def run_internal(self):
self._run(self.work_internal)
# self._run(work_standalone)
def run_standalone(self):
self._run(work_standalone)
def work_internal(target):
with mp.Pool() as pool:
tasks = [[idx] for idx in range(16)]
result = pool.imap(target, tasks)
for res in result:
pass
if __name__ == '__main__':
t1 = time.time()
Worker().run_standalone()
t2 = time.time()
print(f'Standalone took {t2 - t1:.3f} seconds')
t3 = time.time()
Worker().run_internal()
t4 = time.time()
print(f'Internal took {t4 - t3:.3f} seconds')
Вы можете запустить код, когда он покажет «ждите анализа», go и проверьте использование памяти.
Примерно так
Затем во второй раз, когда вы увидите сообщение, нажмите Enter. И наблюдайте, как использование памяти снова увеличивается и уменьшается.
С другой стороны, если вы измените self._run(self.work_internal)
на self._run(work_standalone)
, вы заметите, что скорость очень высокая, а объем памяти не увеличивается, а также затраченное время намного меньше, чем на выполнение self.work_internal
.
Решение
Один из способов решения вашей проблемы - установить self.data
в качестве переменной класса stati c. В обычных случаях это предотвратит повторное копирование / повторную инициализацию переменной экземплярам. Это также предотвратило возникновение проблемы.
class Worker:
data = np.random.random(size=(10000, 10000))
def __init__(self):
pass
...