Мой многопроцессорный код Python медленнее, чем последовательный - PullRequest
1 голос
/ 11 июля 2019

Я пытаюсь реализовать технику обработки изображений «Локальная толщина» в python и opencv. Он был реализован в программном обеспечении для анализа изображений под названием ImageJ. В основном для двоичного изображения алгоритм будет

  1. скелетировать любые белые объекты (для создания скелета или гребня)
  2. для каждой точки скелета / ребра, найдите расстояние до ближайшего края
  3. для любой точки в пределах этого расстояния, назначьте значение толщины равным расстоянию или обновите толщину, если расстояние больше существующего значения толщины

Часть, которую я хочу реализовать с помощью многопроцессорной обработки, - это 3. Исходный код здесь . В python я разделил все точки скелета / гребня на куски и передал каждый патрон процессу. Все процессы взаимодействуют через один общий массив, в котором хранится значение толщины. Однако мой многопроцессорный код работает медленнее, чем последовательный, даже для любого процесса, который будет обрабатывать только часть данных.

import numpy as np
import cv2 as cv
import matplotlib.pylab as plt
from skimage.morphology import medial_axis
from scipy.sparse import coo_matrix
import multiprocessing as mp
import time

def worker(sRidge_shared,iRidge,jRidge,rRidge,w,h,iR_worker,worker):
    print('Job starting for worker',worker)
    start=time.time()
    for iR in iR_worker:
        i = iRidge[iR];
        j = jRidge[iR];
        r = rRidge[iR];
        rSquared = int(r * r + 0.5)
        rInt = int(r)
        if (rInt < r): rInt+=1
        iStart = i - rInt
        if (iStart < 0): iStart = 0
        iStop = i + rInt
        if (iStop >= w): iStop = w - 1
        jStart = j - rInt
        if (jStart < 0): jStart = 0
        jStop = j + rInt
        if (jStop >= h): jStop = h - 1
        for j1 in range(jStart,jStop):
            r1SquaredJ =  (j1 - j) * (j1 - j)
            if (r1SquaredJ <= rSquared):
                for i1 in range(iStart,iStop):
                    r1Squared = r1SquaredJ + (i1 - i) * (i1 - i)
                    if (r1Squared <= rSquared):
                        if (rSquared > sRidge_shared[i1+j1*w]):
                            sRidge_shared[i1+j1*w] = rSquared
    print('Worker',worker,' finished job in ',time.time()-start, 's')



def Ridge_to_localthickness_parallel(ridgeimg):
    w, h = ridgeimg.shape
    M = coo_matrix(ridgeimg)
    nR = M.count_nonzero()
    iRidge = M.row
    jRidge = M.col
    rRidge = M.data
    sRidge = np.zeros((w*h,))
    sRidge_shared = mp.Array('d', sRidge)

    nproc = 10

    p = [mp.Process(target=worker,
                    args=(sRidge_shared,iRidge,jRidge,rRidge,w,h,range(i*nR//nproc,min((i+1)*nR//nproc,nR)),i))
                    for i in range(nproc)]
    for pc in p:
        pc.start()
    for pc in p:
        pc.join()

    a = np.frombuffer(sRidge_shared.get_obj())
    b = a.reshape((h,w))

    return 2*np.sqrt(b)

if __name__ == '__main__':
    mp.freeze_support()
    size = 1024

    img = np.zeros((size,size), np.uint8)
    cv.ellipse(img,(size//2,size//2),(size//3,size//5),0,0,360,255,-1)

    skel, distance = medial_axis(img, return_distance=True)
    dist_on_skel = distance * skel

    start = time.time()
    LT1 = Ridge_to_localthickness_parallel(dist_on_skel)
    print('Multiprocessing elapsed time: ', time.time() - start, 's')

Вот результат:

Serial elapsed time:  71.07010626792908 s
Job starting for worker 0
Job starting for worker 1
Job starting for worker 2
Job starting for worker 3
Job starting for worker 4
Job starting for worker 5
Job starting for worker 7
Job starting for worker 6
Job starting for worker 8
Job starting for worker 9
Worker 0  finished job in  167.6777663230896 s
Worker 9  finished job in  181.82518076896667 s
Worker 1  finished job in  211.21311926841736 s
Worker 8  finished job in  211.43014097213745 s
Worker 7  finished job in  235.29852747917175 s
Worker 2  finished job in  241.1481122970581 s
Worker 6  finished job in  242.3452320098877 s
Worker 3  finished job in  247.0727047920227 s
Worker 5  finished job in  245.52154970169067 s
Worker 4  finished job in  246.9776954650879 s
Multiprocessing elapsed time:  256.9716944694519 s
>>>

Я запускаю это на машине с Windows. Я не пробовал многопоточность, так как не знаю, как получить доступ к общему массиву для многопоточности.

Edit:

Я использовал sharedmem и Thread / ThreadPoolExecutor. Результат стал лучше, чем многопроцессорная, но не последовательная.

Serial elapsed time:  67.51724791526794 s
Job starting for worker 0
Job starting for worker 1
Job starting for worker 2
Job starting for worker 3
Job starting for worker 4
Job starting for worker 6
Job starting for worker 5
Job starting for worker 7
Job starting for worker 8
Job starting for worker 9
Job starting for worker 10
Job starting for worker 11
Job starting for worker 12
Job starting for worker 13
Job starting for worker 14
Job starting for worker 15
Job starting for worker 16
Job starting for worker 17
Job starting for worker 18
Job starting for worker 19
Worker 2  finished job in  60.84959959983826 s
Worker 3  finished job in  63.856611013412476 s
Worker 4  finished job in  67.02961277961731 s
Worker 16  finished job in  68.00975942611694 s
Worker 15  finished job in  70.39874267578125 s
Worker 1  finished job in  75.65659618377686 s
Worker 14  finished job in  76.97173047065735 s
Worker 9  finished job in  78.4876492023468 s
Worker 0  finished job in  87.56459546089172 s
Worker 7  finished job in  89.86062669754028 s
Worker 17  finished job in  91.72178316116333 s
Worker 8  finished job in  94.22166323661804 s
Worker 19  finished job in  93.27084946632385 s
Worker 13  finished job in  95.02370047569275 s
Worker 5  finished job in  98.98063397407532 s
Worker 18  finished job in  97.57283663749695 s
Worker 10  finished job in  103.78466653823853 s
Worker 11  finished job in  105.19767212867737 s
Worker 6  finished job in  105.96561932563782 s
Worker 12  finished job in  105.5306978225708 s
Threading elapsed time:  106.97455644607544 s
>>>

1 Ответ

0 голосов
/ 12 июля 2019

Совместное использование массива несколькими процессами требует огромных затрат.

Как правило, это «оценка» времени для мультиобработки:

  • Время поделиться всеми данными
  • Время вычислений (которое должно быть медленнее, чем ваши последовательные вычисления, так как оно должно вычисляться меньше)
  • Агрегировать результаты.

Здесь я очень подозреваю, что первый шаг с огромными затратами (большие массивы)

Как правило, вы можете легко обрабатывать / обрабатывать несколько потоков как часть кода, которая может быть легко разделена (для которой не нужен полный массив)

...