Python: выполнение вложенного цикла, 2D движущееся окно, параллельно - PullRequest
4 голосов
/ 22 октября 2019

Я работаю с топографическими данными. Для одной конкретной проблемы я написал функцию на Python, которая использует движущееся окно определенного размера для перемещения по матрице (сетке высот). Затем я должен выполнить анализ этого окна и установить в ячейке в центре окна результирующее значение.

Мой конечный результат - это матрица того же размера, что и моя исходная матрица, которая была изменена в соответствии с моиманализ. Эта проблема занимает 11 часов на небольшой площади, поэтому я подумал, что распараллеливание внутреннего цикла ускорит процесс. В качестве альтернативы, может быть и умное решение для векторизации ...

См. Мою функцию ниже, DEM - это двумерный массив, w - размер окна.

def RMSH_det(DEM, w):
    import numpy as np
    from scipy import signal
    [nrows, ncols] = np.shape(DEM)

    #create an empty array to store result
    rms = DEM*np.nan

#    nw=(w*2)**2
#    x = np.arange(0,nw)

    for i in np.arange(w+1,nrows-w):


        for j in np.arange(w+1,ncols-w):

            d1 = np.int64(np.arange(i-w,i+w))
            d2 = np.int64(np.arange(j-w,j+w))

            win = DEM[d1[0]:d1[-1],d2[0]:d2[-1]]

            if np.max(np.isnan(win)) == 1:
                rms[i,j] = np.nan

            else:
                win = signal.detrend(win, type = 'linear')
                z = np.reshape(win,-1)
                nz = np.size(z)
                rootms = np.sqrt(1 / (nz - 1) * np.sum((z-np.mean(z))**2))
                rms[i,j] = rootms


    return(rms)

Я искал SO / SE для решения моего вопроса и сталкивался со многими примерами вложенных циклов for и пытался запустить их параллельно. Я изо всех сил пытался адаптировать свой код в соответствии с примерами и был бы признателен за помощь. Решение этой проблемы помогло бы мне работать с несколькими другими функциями движущегося окна, которые у меня есть.

До сих пор я переместил внутренний цикл в его собственную функцию, которую можно вызывать из внешнего цикла:

def inLoop(i, w, DEM,rms,ncols):
        for j in np.arange(w+1,ncols-w):

            d1 = np.int64(np.arange(i-w,i+w))
            d2 = np.int64(np.arange(j-w,j+w))

            win = DEM[d1[0]:d1[-1],d2[0]:d2[-1]]

            if np.max(np.isnan(win)) == 1:
                rms[i,j] = np.nan

            else:
                win = signal.detrend(win, type = 'linear')
                z = np.reshape(win,-1)
                nz = np.size(z)
                rootms = np.sqrt(1 / (nz - 1) * np.sum((z-np.mean(z))**2))
                rms[i,j] = rootms


        return(rms)

Но я не был уверен в правильном способе кодирования вызова в Pool с необходимыми переменными, которые должны быть введены во внутренний цикл. См. Внешнюю петлю ниже:

 for i in np.arange(w+1,nrows-w):
        number_of_workers = 8

        with Pool(number_of_workers) as p:
            #call the pool
            p.starmap(inLoop, [i, w, DEM, rms, ncols])


Остальные вопросы:

  • Можно ли оптимизировать этот код путем распараллеливания?

  • Как мне успешно сохранить результат параллельного вложенного цикла?

Ответы [ 2 ]

3 голосов
/ 23 октября 2019

Решение с использованием Numba

В некоторых случаях это очень легко сделать, если поддерживаются все используемые вами функции. В вашем коде win = signal.detrend(win, type = 'linear') - это часть, которую вы должны реализовать в Numba, потому что эта функция не поддерживается.

Реализация отклонения в Numba

Если вы посмотрите на исходный код отклонения и извлечете соответствующие части для вашей проблемы, это можетвыглядит так:

@nb.njit()
def detrend(w):
    Npts=w.shape[0]
    A=np.empty((Npts,2),dtype=w.dtype)
    for i in range(Npts):
        A[i,0]=1.*(i+1) / Npts
        A[i,1]=1.

    coef, resids, rank, s = np.linalg.lstsq(A, w.T)
    out=w.T- np.dot(A, coef)
    return out.T

Я также реализовал более быстрое решение для np.max(np.isnan(win)) == 1

@nb.njit()
def isnan(win):
    for i in range(win.shape[0]):
        for j in range(win.shape[1]):
            if np.isnan(win[i,j]):
                return True
    return False

Основная функция

Как я использовал Numbaздесь распараллеливание очень простое, просто prange на внешнем цикле и

import numpy as np
import numba as nb

@nb.njit(parallel=True)
def RMSH_det_nb(DEM, w):
    [nrows, ncols] = np.shape(DEM)

    #create an empty array to store result
    rms = DEM*np.nan

    for i in nb.prange(w+1,nrows-w):
        for j in range(w+1,ncols-w):
            win = DEM[i-w:i+w-1,j-w:j+w-1]

            if isnan(win):
                rms[i,j] = np.nan
            else:
                win = detrend(win)
                z = win.flatten()
                nz = z.size
                rootms = np.sqrt(1 / (nz - 1) * np.sum((z-np.mean(z))**2))
                rms[i,j] = rootms

    return rms

Время (маленький пример)

w = 10
DEM=np.random.rand(100, 100).astype(np.float32)

res1=RMSH_det(DEM, w)
res2=RMSH_det_nb(DEM, w)
print(np.allclose(res1,res2,equal_nan=True))
#True

%timeit res1=RMSH_det(DEM, w)
#1.59 s ± 72 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res2=RMSH_det_nb(DEM, w) #approx. 55 times faster
#29 ms ± 1.85 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

Время длябольшие массивы

w = 10
DEM=np.random.rand(1355, 1165).astype(np.float32)
%timeit res2=RMSH_det_nb(DEM, w)
#6.63 s ± 21.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

[Редактировать] Реализация с использованием нормальных уравнений

Переопределенная система

Этот метод имеет меньшая числовая точность . Хотя это решение намного быстрее.

@nb.njit()
def isnan(win):
    for i in range(win.shape[0]):
        for j in range(win.shape[1]):
            if win[i,j]==np.nan:
                return True
    return False

@nb.njit()
def detrend(w):
    Npts=w.shape[0]
    A=np.empty((Npts,2),dtype=w.dtype)
    for i in range(Npts):
        A[i,0]=1.*(i+1) / Npts
        A[i,1]=1.

    coef, resids, rank, s = np.linalg.lstsq(A, w.T)
    out=w.T- np.dot(A, coef)
    return out.T

@nb.njit()
def detrend_2(w,T1,A):
    T2=np.dot(A.T,w.T)
    coef=np.linalg.solve(T1,T2)

    out=w.T- np.dot(A, coef)

    return out.T

@nb.njit(parallel=True)
def RMSH_det_nb_normal_eq(DEM,w):
    [nrows, ncols] = np.shape(DEM)

    #create an empty array to store result
    rms = DEM*np.nan

    Npts=w*2-1
    A=np.empty((Npts,2),dtype=DEM.dtype)
    for i in range(Npts):
        A[i,0]=1.*(i+1) / Npts
        A[i,1]=1.

    T1=np.dot(A.T,A)

    nz = Npts**2
    for i in nb.prange(w+1,nrows-w):
        for j in range(w+1,ncols-w):
            win = DEM[i-w:i+w-1,j-w:j+w-1]

            if isnan(win):
                rms[i,j] = np.nan
            else:
                win = detrend_2(win,T1,A)
                rootms = np.sqrt(1 / (nz - 1) * np.sum((win-np.mean(win))**2))
                rms[i,j] = rootms

    return rms

Время

w = 10
DEM=np.random.rand(100, 100).astype(np.float32)

res1=RMSH_det(DEM, w)
res2=RMSH_det_nb(DEM, w)
print(np.allclose(res1,res2,equal_nan=True))
#True

%timeit res1=RMSH_det(DEM, w)
#1.59 s ± 72 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res2=RMSH_det_nb_normal_eq(DEM,w)
#7.97 ms ± 89.4 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

Оптимизированное решение с использованием нормальных уравнений

Временные массивы используются повторно, чтобы избежать дорогостоящих выделений памяти, и используется пользовательская реализация для умножения матриц. Это рекомендуется только для очень маленьких матриц, в большинстве других случаев np.dot (sgeemm) будет намного быстрее.

@nb.njit()
def matmult_2(A,B,out):
    for j in range(B.shape[1]):
        acc1=nb.float32(0)
        acc2=nb.float32(0)
        for k in range(B.shape[0]):
            acc1+=A[0,k]*B[k,j]
            acc2+=A[1,k]*B[k,j]
        out[0,j]=acc1
        out[1,j]=acc2
    return out

@nb.njit(fastmath=True)
def matmult_mod(A,B,w,out):
    for j in range(B.shape[1]):
        for i in range(A.shape[0]):
            acc=nb.float32(0)
            acc+=A[i,0]*B[0,j]+A[i,1]*B[1,j]
            out[j,i]=acc-w[j,i]
    return out

@nb.njit()
def detrend_2_opt(w,T1,A,Tempvar_1,Tempvar_2):
    T2=matmult_2(A.T,w.T,Tempvar_1)
    coef=np.linalg.solve(T1,T2)
    return matmult_mod(A, coef,w,Tempvar_2)

@nb.njit(parallel=True)
def RMSH_det_nb_normal_eq_opt(DEM,w):
    [nrows, ncols] = np.shape(DEM)

    #create an empty array to store result
    rms = DEM*np.nan

    Npts=w*2-1
    A=np.empty((Npts,2),dtype=DEM.dtype)
    for i in range(Npts):
        A[i,0]=1.*(i+1) / Npts
        A[i,1]=1.

    T1=np.dot(A.T,A)

    nz = Npts**2
    for i in nb.prange(w+1,nrows-w):
        Tempvar_1=np.empty((2,Npts),dtype=DEM.dtype)
        Tempvar_2=np.empty((Npts,Npts),dtype=DEM.dtype)
        for j in range(w+1,ncols-w):
            win = DEM[i-w:i+w-1,j-w:j+w-1]

            if isnan(win):
                rms[i,j] = np.nan
            else:
                win = detrend_2_opt(win,T1,A,Tempvar_1,Tempvar_2)
                rootms = np.sqrt(1 / (nz - 1) * np.sum((win-np.mean(win))**2))
                rms[i,j] = rootms

    return rms

Время

w = 10
DEM=np.random.rand(100, 100).astype(np.float32)

res1=RMSH_det(DEM, w)
res2=RMSH_det_nb_normal_eq_opt(DEM, w)
print(np.allclose(res1,res2,equal_nan=True))
#True

%timeit res1=RMSH_det(DEM, w)
#1.59 s ± 72 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res2=RMSH_det_nb_normal_eq_opt(DEM,w)
#4.66 ms ± 87.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Времена для isnan

Эта функция является совершенно другой реализацией. Это намного быстрее, если NaN находится в начале массива, но в любом случае, даже если и нет, то есть некоторое ускорение. Я сравнил его с небольшими массивами (приблизительный размер окна) и большим размером, предложенным @ user3666197.

case_1=np.full((20,20),np.nan)
case_2=np.full((20,20),0.)
case_2[10,10]=np.nan
case_3=np.full((20,20),0.)

case_4 = np.full( ( int( 1E4 ), int( 1E4 ) ),np.nan)
case_5 = np.ones( ( int( 1E4 ), int( 1E4 ) ) )

%timeit np.any(np.isnan(case_1))
%timeit np.any(np.isnan(case_2))
%timeit np.any(np.isnan(case_3))
%timeit np.any(np.isnan(case_4))
%timeit np.any(np.isnan(case_5))
#2.75 µs ± 73.1 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
#2.75 µs ± 46.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
#2.76 µs ± 32.9 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
#81.3 ms ± 2.97 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
#86.7 ms ± 2.26 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%timeit isnan(case_1)
%timeit isnan(case_2)
%timeit isnan(case_3)
%timeit isnan(case_4)
%timeit isnan(case_5)
#244 ns ± 5.02 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
#357 ns ± 1.07 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
#475 ns ± 9.28 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
#235 ns ± 0.933 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
#58.8 ms ± 2.08 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
1 голос
/ 22 октября 2019

Q : Эта проблема занимает 11 часов , чтобы бегать на небольшой территории, ... следите за обновлениями, мы можем и мы получим менее 20 [мин] !!

были предоставлены соответствующие объяснения, за что я благодарю автора O / P:

# DEM.shape = [nrows, ncols] = [ 1355, 1165 ]
# DEM.dtype = float32 
#    .flags = C_CONTIGUOUS    : True
#             F_CONTIGUOUS    : False
#             OWNDATA         : True
#             WRITEABLE       : True
#             ALIGNED         : True
#             WRITEBACKIFCOPY : False
#             UPDATEIFCOPY    : False

Я попытался просмотреть код и настроить макетнемного более эффективный код, прежде чем приступить к внедрению всех популярных и готовых к использованию numpy + numba стероидов, и промежуточный numpy -только результат работает
наобразец [100,100] матрицы высот для около ~ 6 [s] при указанной ширине окна ядра w = 10

То же самое, для [200,200] DEM-сетка, принимает под ~ 36 [s] - очевидно, масштабирование составляет ~ O( N^2 )

То же самое, для [1000,1000] DEM-сетки, взял под ~ 1077 [s] ~ 17.6 [min] вау!

Поле .jit пробный на [1000,1000] DEM-grid в настоящее время тестируется и обновит сообщение после завершения + после numba.jit() код будет рад запускать дальнейшие ускоренные результаты


Пока, довольно многообещающе, не правда ли?

Если вы @ morrismc протестируйте код "как есть" сейчас, на [100,100]-матрица, мы уже можем угадать достигнутый диапазон основного ускорения , даже до завершения запущенных тестов.

>>> pass;    import numpy as np
>>> from zmq import Stopwatch; clk = Stopwatch()
>>>
>>> size =  100; demF32 = np.random.random( ( size, size ) ).astype( np.float32 ); resF32 = demF32.copy(); clk.start(); _ = RMSH_det( demF32, 10, resF32 ); t = clk.stop(); print( "{1:>13d} [us]\nNumOf_np.nan-s was {0:d}".format( _, t ) )
      6492192 [us]
NumOf_np.nan-s was 0

>>> size =  200; demF32 = np.random.random( ( size, size ) ).astype( np.float32 ); resF32 = demF32.copy(); clk.start(); _ = RMSH_det( demF32, 10, resF32 ); t = clk.stop(); print( "{1:>13d} [us]\nNumOf_np.nan-s was {0:d}".format( _, t ) )
     35650629 [us]
NumOf_np.nan-s was 0

>>> size = 1000; demF32 = np.random.random( ( size, size ) ).astype( np.float32 ); resF32 = demF32.copy(); clk.start(); _ = RMSH_det( demF32, 10, resF32 ); t = clk.stop(); print( "{1:>13d} [us]\nNumOf_np.nan-s was {0:d}".format( _, t ) )
   1058702889 [us]
NumOf_np.nan-s was 0

Все это на scipy 1.2.1, таким образом, без1.3.1 возможны дальнейшие ускорения


A numba.jit() LLVM-скомпилированный код. Ой, медленнее?

numba.jit() - ускорение показало примерно 200 [ms] хуже время выполнения на [100,100] DEM-сетке, с указанием подписи (такздесь не начислялись расходы на специальный анализ) и nogil = True («0.43.1 + 0.g8dabe7abe.dirty» пока не самый последний)

Думаю, здесь больше нечего получить, безПереместив игру в скомпилированные Cython территории, но при этом имея десятки минут вместо десятков часов, Alea Iacta Est - - просто numpy интеллектуальное векторизованное правило кода!


ЭПИЛОГ:

Если исходный алгоритм был верным (и некоторые сомнения оставались в исходном коде для дальнейшей работы по улучшению), любая попытка запустить какую-то другуюформа [PARALLEL] поток выполнения кода здесь не поможет (ядра-окна [w, w] - очень малые и несмежные области структуры памяти матрицы DEM, затраты на ввод-вывод памятиявляются доминирующей частью бюджета времени исполнения здесь, и некоторые более хорошие индексации могут улучшитьповторное использование строки кэша, но общие усилия значительно превышают бюджет, так как цель снижения с ~ 11 [hrs] до ~ 6 [hrs] была более чем успешно достигнута с ~ 20 [min] время выполнения, достижимое для [1300,1100] float32 DEM-сетки

Код был оставлен как есть (не PEP-8) из-за всех дополнительных дидактических значений для [DOC.me], [TEST.me] и [PERF.me] фаз QA, так что все виды PEP-isto-evangelisators действительно имеют авторское представление O / P на макет на всю ширину экрана слева, чтобыпозвольте понять WHY и улучшить код, который с зачищенными комментариями потерял бы ее / ее путь вперед в дальнейшем улучшении производительности кода. Thx.

@jit( [ "int32( float32[:,:], int32, float32[:,:] )", ], nogil    = True )                  # numba.__version__ '0.43.1+0.g8dabe7abe.dirty'
def RMSH_det_jit( DEMf32, w, rmsRESULTf32 ):                            # pre-allocate rmsRESULTf32[:,:] externally
    #import numpy as np
    #from scipy import signal
    #
    # [nrows, ncols] = np.shape( DEM )                                  # avoid ~ [ 1355, 1165 ]
    #                                                                   # DEM.dtype = float32 
    #                                                                   #    .flags = C_CONTIGUOUS    : True
    #                                                                   #             F_CONTIGUOUS    : False
    #                                                                   #             OWNDATA         : True
    #                                                                   #             WRITEABLE       : True
    #                                                                   #             ALIGNED         : True
    #                                                                   #             WRITEBACKIFCOPY : False
    #                                                                   #             UPDATEIFCOPY    : False
    #
    rmsRESULTf32[:,:] = np.nan                                          #        .STO[:,:] np.nan-s, using in-place assignment into the by-ref passed, externally pre-allocated np.ndarray
    dtdWIN            = np.ones( ( 2 * w - 1,                           #        .ALLOC once, re-use 1M+ times
                                   2 * w - 1 ) )
    a_div_by_nz_minus1 = 1. / ( dtdWIN.size - 1  )                      #        .SET float CONST with about a ~1M+ re-use
    a_num_of_NaNs      = 0                                              #        .SET i4 bonus value, ret'd as a side-effect of the signature ... 
    # rms = DEM*np.nan                                                  # avoid ( pre-alloc rmsRESULTf32 ) externally create and pass a right-sized, empty array to store all results
    # nw  = ( w * 2 )**2
    # x   = np.arange( 0, nw )

    #                        11..1344
    #or     i in np.arange( w+1,           nrows-w ):                   # w ~ 10 -> [11:1344, 11:1154]
    for     i in np.arange( w+1, DEMf32.shape[0]-w ):                   #         ??? never touches DEM-row/column[0]?? or off-by-one indexing error ???
        fromI = i - w                                                   #        .UPD ALAP
        tillI = i + w - 1                                               #        .UPD ALAP upper bound index excluded ( this is how a code in [ np.arange(...)[0]:np.arange(...)[-1] ] works )
        #                    11..1154
        #or j in np.arange( w+1,           ncols-w ):
        for j in np.arange( w+1, DEMf32.shape[1]-w ):
            fromJ = j - w                                               #        .UPD ALAP
            tillJ = j + w - 1                                           #        .UPD ALAP upper bound index excluded ( this is how a code in [ np.arange(...)[0]:np.arange(...)[-1] ] works )
            #                       1..1334:21..1354                    #         ??? never touches first/last DEM-row/column??
            # d1 = np.int64( np.arange( i-w, i+w ) )                    # AVOID: 1M+ times allocated, yet never consumed, but their edge values
            # d2 = np.int64( np.arange( j-w, j+w ) )                    # AVOID: 1M+ times allocated, yet never consumed, but their edge values

            # win = DEM[ d1[0]:d1[-1],                                  # AVOID: while a .view-only, no need to 1M+ times instantiate a "kernel"-win(dow] ( this will create a np.view into the original DEM, not a copy ! )
            #            d2[0]:d2[-1]                                   # ?.or.?   NOT a .view-only, but a new .copy() instantiated, so as to call .detrend() w/o in-place modifying DEMf32 ???
            #            ]                                              # ?.or.?   NOT a .view-only, but a new .copy() instantiated, so as to call .detrend() w/o in-place modifying DEMf32 ???
            dtdWIN[:,:] = DEMf32[fromI:tillI, fromJ:tillJ]              #          NOT a .view-only, but a     .copy() re-populated into a just once and only once pre-allocated dtdWIN, via an in-place copy
            #f np.max( np.isnan(    win ) ) == 1:                       # AVOID: 1M+ times full-range scan, while any first np.nan decides the game and no need to scan "the rest"
            if np.any( np.isnan( dtdWIN ) ):                            #        "density" of np.nan-s determine, if this is a good idea to pre-store
               a_num_of_NaNs += 1                                       # .INC
               continue                                                 #        .NOP/LOOP from here, already pre-stored np.nan-s for this case
               # rms[i,j] = np.nan                                      # DUP ( already stored in initialisation ... )
            else:
               #in    = signal.detrend(    win, type = 'linear' )       # REALLY?: in-place modification of DEM-matrix ???
               dtdWIN = signal.detrend( dtdWIN, type = 'linear'   )     #    in scipy-v1.3.1+ can mod in-place,   overwrite_data = True ) # REMOVE OLS-fit-linear trend
               dtdWIN = signal.detrend( dtdWIN, type = 'constant' )     #    in scipy-v1.3.1+ can mod in-place,   overwrite_data = True ) # REMOVE mean
               #z  = np.reshape( win, -1 )                              # AVOID:~1M+ re-counting constant value, known from w directly
               #nz = np.size( z )                                       # AVOID:~1M+ re-counting constant value, known from w directly
               #rootms    = np.sqrt( 1 / ( nz - 1 ) * np.sum( ( z - np.mean( z ) )**2 ) )
               #rms[i,j]  = rootms
               rmsRESULTf32[i,j] = np.sqrt( a_div_by_nz_minus1          # .STO a "scaled"
                                          * np.dot(   dtdWIN,
                                                      dtdWIN.T
                                                      ).sum()
                                          # np.sum( ( dtdWIN            #         SUM of
                                          #       # - dtdWIN.mean()     #               mean-removed ( ALREADY done via scipy.signal.detrend( 'const' ) above )
                                          #           )**2              #               SQUARES
                                          #         )
                                            )                           #      ROOT
    return( a_num_of_NaNs )                                             # ret i4
...