Другая реализация концепции, с процессами, порождающими процессы (РЕДАКТИРОВАТЬ: Jit протестировано):
import numpy as np
# better pickling
import pathos
from contextlib import closing
from numba import jit
#https://stackoverflow.com/questions/47574860/python-pathos-process-pool-non-daemonic
import multiprocess.context as context
class NoDaemonProcess(context.Process):
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
class NoDaemonPool(pathos.multiprocessing.Pool):
def Process(self, *args, **kwds):
return NoDaemonProcess(*args, **kwds)
# matrix dimensions
x = 100 # i
y = 500 # j
NUM_PROCESSES = 10 # total NUM_PROCESSES*NUM_PROCESSES will be spawned
SS = np.zeros([x, y], dtype=float)
@jit
def foo(i):
return (i*i + 1)
@jit
def bar(phii, j):
return phii*(j+1)
# The code which is implemented down here:
'''
for i in range(x):
phii = foo(i)
for j in range(y):
SS[i, j] = bar(phii, j)
'''
# Threaded version:
# queue is in global scope
def outer_loop(i):
phii = foo(i)
# i is in process scope
def inner_loop(j):
result = bar(phii,j)
# the data is coordinates and result
return (i, j, result)
with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:
res = list(pool.imap(inner_loop, range(y)))
return res
with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:
results = list(pool.imap(outer_loop, range(x)))
result_list = []
for r in results:
result_list += r
# read results from queue
for res in result_list:
if res:
i, j, val = res
SS[i,j] = val
# check that all cells filled
print(np.count_nonzero(SS)) # 100*500
РЕДАКТИРОВАТЬ: Объяснение.
Причина всех сложностей в этом коде заключается в том, что я хотелсделать больше распараллеливания, чем запрашивал OP.Если распараллеливается только внутренний цикл, то внешний цикл остается, поэтому для каждой итерации внешнего цикла создается новый пул процессов и выполняются вычисления для внутреннего цикла.Поскольку, как мне показалось, эта формула не зависит от других итераций внешнего цикла, я решил все распараллелить: теперь вычисления для внешнего цикла назначаются процессам из пула, после чего каждый из «внешнего цикла»процессы создают свой собственный новый пул, и дополнительные процессы создаются для выполнения вычислений для внутреннего цикла.
Я могу ошибаться, и внешний цикл не должен быть распараллелен;В этом случае вы можете оставить только внутренний пул процессов.
Использование пулов процессов может быть неоптимальным решением, поскольку на создание и уничтожение пулов будет затрачено время.Более эффективным (но требующим ручной работы режима) решением будет создание экземпляров N процессов раз и навсегда, а затем подача в них данных и получение результатов с помощью многопроцессорной очереди ().Поэтому вам следует сначала проверить, дает ли это многопроцессорное решение достаточное ускорение (это произойдет, если время на создание и уничтожение пулов будет меньше по сравнению с Z0_SteadyState
прогоном).
Следующее осложнение заключается в том, что искусственноепул демонов.Процесс демона используется для изящной остановки приложения: при выходе из основной программы процессы демона завершаются без вывода сообщений.Однако процесс-демон не может порождать дочерние процессы.Здесь, в вашем примере, вам нужно дождаться окончания каждого процесса для извлечения данных, поэтому я сделал их недемонами, чтобы позволить порожденным дочерним процессам вычислять внутренний цикл.
Обмен данными: я полагаю, что объем данных, которыйнеобходимо заполнить матрицу и время, чтобы сделать это мало по сравнению с фактическими вычислениями.Поэтому я использую пулы и функцию pool.imap
(которая немного быстрее, чем .map()
. Вы также можете попробовать .imap_unordered()
, но в вашем случае это не должно иметь существенного значения).Таким образом, внутренний пул ожидает, пока все результаты не будут вычислены, и возвращает их в виде списка.Таким образом, внешний пул возвращает список списков, которые должны быть объединены.Затем матрица восстанавливается по этим результатам в одном быстром цикле.
Замечание with closing()
вещь: она закрывает пул автоматически после того, как все по этому выражению завершено, избегая потребления памяти процессами зомби.
Также вы можете заметить, что я странным образом определил одну функцию внутри другой, и внутри процессов у меня есть доступ к некоторым переменным, которые не были переданы туда: i
, phii
.Это происходит потому, что процессы имеют доступ к глобальной области, из которой они были запущены с политикой copy-on-change
(режим по умолчанию fork
).Это не связано с травлением и является быстрым.
Последний комментарий касается использования pathos
библиотеки вместо стандартных multiprocessing
, concurrent.futures
, subprocess
и т. Д. Причина в том, что pathos
имеетлучше использовать библиотеку выбора, так что она может сериализовать функции, которые не могут стандартные библиотеки (например, лямбда-функции).Я не знаю о вашей функции, поэтому я использовал более мощный инструмент, чтобы избежать дальнейших проблем.
И самое последнее: многопроцессорная обработка против многопоточности.Вы можете изменить pathos
пул обработки, скажем, на стандартный ThreadPoolExecutor
с concurrent.futures
, как я делал в начале, когда только начинал этот код.Но во время выполнения на моей системе ЦП загружается только на 100% (то есть используется одно ядро, похоже, что все 8 ядер загружены на 15-20%).Я не настолько квалифицирован, чтобы понимать различия между потоками и процессами, но мне кажется, что процессы позволяют использовать все ядра (100% каждое, общее количество 800%).