Как использовать многопоточность для ускорения вложенных вычислений в цикле? - PullRequest
0 голосов
/ 21 октября 2018

Я пытаюсь выполнить числовое интегрирование для большого массива, и вычисление занимает очень много времени.Я пытался ускорить мой код, используя numba и jit decorator, но numpy.trapz не поддерживается.

Моя новая идея заключается в создании n-многих потоков для параллельного выполнения вычислений, но яМне было интересно, как я могу это сделать или это вообще возможно?

Ссылка на код ниже

Могу ли я заставить sz [2] выполнять несколько потоков одновременно?который вызывает ZO_SteadState для вычисления значений?

    for i in range(sz[1]):
        phii = phi[i]
        for j in range(sz[2]):
            s = tau[0, i, j, :].reshape(1, n4)
            [R3, PHI3, S3] = meshgrid(rprime, phiprime, s)
            BCoeff = Bessel0(bm * R3)

            SS[0, i, j] = ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v)

Рассматриваемый расчет.

@jit()
def ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v):
    g = 1000000 * exp(-(10 ** 5) * (R3 - (b / maxt) * S3) ** 2) * (
            exp(-(10 ** 5) * (PHI3 - 0) ** 2) + exp(-(10 ** 5) * (PHI3 - 2 * np.pi) ** 2) + exp(
        -(10 ** 5) * (PHI3 - 2 * np.pi / 3) ** 2) + exp(
        -(10 ** 5) * (PHI3 - 4 * np.pi / 3) ** 2))  # stationary point heat source.

    y = R3 * ((np.sqrt(2) / b) * (1 / (np.sqrt((H2 ** 2 / bm ** 2) + (1 - (v ** 2 / (bm ** 2 * b ** 2))))))
              * (BCoeff / Bessel_Denom)) * np.cos(v * (phii - PHI3)) * g

    x = (np.trapz(y, phiprime, axis=0)).reshape(1, 31, 300)

    # integral transform of heat source. integral over y-axis
    gbarbar = np.trapz(x, rprime, axis=1)

    PHI2 = np.meshgrid(phiprime, s)[0]

    sz2 = PHI2.shape
    f = h2 * 37 * Array_Ones((sz2[0], sz[1]))  # boundary condition.

    fbar = np.trapz(np.cos(v * (phii - PHI2)) * f, phiprime, 1).reshape(1, n4)  # integrate over y

    A = (alpha / k) * gbarbar + ((np.sqrt(2) * alpha) / k2) * (
                1 / (np.sqrt((H2 ** 2 / bm ** 2) + (1 - (v ** 2 / (bm ** 2 * b ** 2)))))) * fbar

    return np.trapz(exp(-alpha * bm ** 2 * (T[0, i, j] - s)) * A, s)

Ответы [ 2 ]

0 голосов
/ 21 октября 2018

Другая реализация концепции, с процессами, порождающими процессы (РЕДАКТИРОВАТЬ: Jit протестировано):

import numpy as np

# better pickling
import pathos 
from contextlib import closing


from numba import jit

#https://stackoverflow.com/questions/47574860/python-pathos-process-pool-non-daemonic
import multiprocess.context as context
class NoDaemonProcess(context.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonPool(pathos.multiprocessing.Pool):
    def Process(self, *args, **kwds):
        return NoDaemonProcess(*args, **kwds)




# matrix dimensions
x = 100 # i
y = 500 # j

NUM_PROCESSES = 10 # total NUM_PROCESSES*NUM_PROCESSES will be spawned

SS = np.zeros([x, y], dtype=float)

@jit
def foo(i):
    return (i*i + 1)
@jit
def bar(phii, j):
    return phii*(j+1)

# The code which is implemented down here:
'''
for i in range(x):
    phii = foo(i)
    for j in range(y):
        SS[i, j] = bar(phii, j)
'''

# Threaded version:
# queue is in global scope


def outer_loop(i):

    phii = foo(i)

    # i is in process scope
    def inner_loop(j):
        result = bar(phii,j)
        # the data is coordinates and result
        return (i, j, result)


    with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:
        res = list(pool.imap(inner_loop, range(y)))
    return res

with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:    
    results = list(pool.imap(outer_loop, range(x)))

result_list = []
for r in results:
    result_list += r


# read results from queue
for res in result_list:
    if res:
        i, j, val = res
        SS[i,j] = val


# check that all cells filled
print(np.count_nonzero(SS)) # 100*500

РЕДАКТИРОВАТЬ: Объяснение.

Причина всех сложностей в этом коде заключается в том, что я хотелсделать больше распараллеливания, чем запрашивал OP.Если распараллеливается только внутренний цикл, то внешний цикл остается, поэтому для каждой итерации внешнего цикла создается новый пул процессов и выполняются вычисления для внутреннего цикла.Поскольку, как мне показалось, эта формула не зависит от других итераций внешнего цикла, я решил все распараллелить: теперь вычисления для внешнего цикла назначаются процессам из пула, после чего каждый из «внешнего цикла»процессы создают свой собственный новый пул, и дополнительные процессы создаются для выполнения вычислений для внутреннего цикла.

Я могу ошибаться, и внешний цикл не должен быть распараллелен;В этом случае вы можете оставить только внутренний пул процессов.

Использование пулов процессов может быть неоптимальным решением, поскольку на создание и уничтожение пулов будет затрачено время.Более эффективным (но требующим ручной работы режима) решением будет создание экземпляров N процессов раз и навсегда, а затем подача в них данных и получение результатов с помощью многопроцессорной очереди ().Поэтому вам следует сначала проверить, дает ли это многопроцессорное решение достаточное ускорение (это произойдет, если время на создание и уничтожение пулов будет меньше по сравнению с Z0_SteadyState прогоном).

Следующее осложнение заключается в том, что искусственноепул демонов.Процесс демона используется для изящной остановки приложения: при выходе из основной программы процессы демона завершаются без вывода сообщений.Однако процесс-демон не может порождать дочерние процессы.Здесь, в вашем примере, вам нужно дождаться окончания каждого процесса для извлечения данных, поэтому я сделал их недемонами, чтобы позволить порожденным дочерним процессам вычислять внутренний цикл.

Обмен данными: я полагаю, что объем данных, которыйнеобходимо заполнить матрицу и время, чтобы сделать это мало по сравнению с фактическими вычислениями.Поэтому я использую пулы и функцию pool.imap (которая немного быстрее, чем .map(). Вы также можете попробовать .imap_unordered(), но в вашем случае это не должно иметь существенного значения).Таким образом, внутренний пул ожидает, пока все результаты не будут вычислены, и возвращает их в виде списка.Таким образом, внешний пул возвращает список списков, которые должны быть объединены.Затем матрица восстанавливается по этим результатам в одном быстром цикле.

Замечание with closing() вещь: она закрывает пул автоматически после того, как все по этому выражению завершено, избегая потребления памяти процессами зомби.

Также вы можете заметить, что я странным образом определил одну функцию внутри другой, и внутри процессов у меня есть доступ к некоторым переменным, которые не были переданы туда: i, phii.Это происходит потому, что процессы имеют доступ к глобальной области, из которой они были запущены с политикой copy-on-change (режим по умолчанию fork).Это не связано с травлением и является быстрым.

Последний комментарий касается использования pathos библиотеки вместо стандартных multiprocessing, concurrent.futures, subprocess и т. Д. Причина в том, что pathos имеетлучше использовать библиотеку выбора, так что она может сериализовать функции, которые не могут стандартные библиотеки (например, лямбда-функции).Я не знаю о вашей функции, поэтому я использовал более мощный инструмент, чтобы избежать дальнейших проблем.

И самое последнее: многопроцессорная обработка против многопоточности.Вы можете изменить pathos пул обработки, скажем, на стандартный ThreadPoolExecutor с concurrent.futures, как я делал в начале, когда только начинал этот код.Но во время выполнения на моей системе ЦП загружается только на 100% (то есть используется одно ядро, похоже, что все 8 ядер загружены на 15-20%).Я не настолько квалифицирован, чтобы понимать различия между потоками и процессами, но мне кажется, что процессы позволяют использовать все ядра (100% каждое, общее количество 800%).

0 голосов
/ 21 октября 2018

Это общая идея, которую я, вероятно, сделаю.Недостаточно контекста, чтобы дать вам более надежный пример.Вы должны будете установить все свои переменные в классе.

import multiprocessing

pool = multiprocessing.Pool(processes=12)
runner = mp_Z0(variable=variable, variable2=variable2)

for i, j, v in pool.imap(runner.run, range(sz[1]):
    SS[0, i, j] = v


class mp_Z0:

    def __init__(self, **kwargs):
        for k, v in kwargs:
            setattr(self, k, v)


    def run(self, i):
        phii = self.phi[i]
        for j in range(self.sz[2]):
            s = self.tau[0, i, j, :].reshape(1, self.n4)
            [R3, PHI3, S3] = meshgrid(self.rprime, self.phiprime, s)
            BCoeff = Bessel0(self.bm * R3)

            return (i, j, ZO_SteadyState(self.alpha, self.b, self.bm, BCoeff, Bessel_Denom, self.k2, self.maxt, phii, self.PHI2, PHI3, self.phiprime, R3, self.rprime, self.s, S3, self.T, self.v))

Это пример (при условии, что все находится в локальном пространстве имен), если вы делаете это без классов:

import multiprocessing

pool = multiprocessing.Pool(processes=12)    
def runner_function(i):
        phii = phi[i]
        for j in range(sz[2]):
            s = tau[0, i, j, :].reshape(1, n4)
            [R3, PHI3, S3] = meshgrid(rprime, phiprime, s)
            BCoeff = Bessel0(bm * R3)

            return (i, j, ZO_SteadyState(alpha, b, bm, BCoeff, Bessel_Denom, k2, maxt, phii, PHI2, PHI3,
                                         phiprime, R3, rprime, s, S3, T, v))

for i, j, v in pool.imap(runner_function, range(sz[1]):
    SS[0, i, j] = v   
...