Многопроцессорная обработка Python Pool.map вызывает aquire? - PullRequest
12 голосов
/ 22 сентября 2010

У меня есть numpy.array размером 640x480 изображений, каждое из которых имеет длину 630 изображений. Таким образом, общий массив составляет 630x480x640. Я хочу создать среднее изображение, а также рассчитать стандартное отклонение для каждый пиксель на всех 630 изображениях.

Это легко сделать с помощью

avg_image = numpy.mean(img_array, axis=0)
std_image = numpy.std(img_array, axis=0)

Однако, так как я запускаю это для 50 или около того таких массивов, и есть Рабочая станция с 8 ядрами / 16 нитями, я решил, что стану жадным и распараллелим вещи multiprocessing.Pool.

Итак, я сделал следующее:

def chunk_avg_map(chunk):
    #do the processing
    sig_avg = numpy.mean(chunk, axis=0)
    sig_std = numpy.std(chunk, axis=0)
    return([sig_avg, sig_std])

def chunk_avg(img_data):

    #take each row of the image
    chunks = [img_data[:,i,:] for i in range(len(img_data[0]))]

    pool = multiprocessing.Pool()
    result = pool.map(chunk_avg_map, chunks)
    pool.close()
    pool.join()
    return result

Однако я увидел только небольшое ускорение. Поместив операторы печати в chunk_avg_map, я смог определить, что одновременно запускается только один или два процесса, а не 16 (как я и ожидал).

Затем я запустил свой код через cProfile в iPython:

%prun current_image_anal.main()

Результат показал, что наибольшее время было потрачено на звонки для приобретения:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1527  309.755    0.203  309.755    0.203 {built-in method acquire}

Что, как я понимаю, связано с блокировкой, но я не понимаю, почему мой код будет это делать. У кого-нибудь есть идеи?

[РЕДАКТИРОВАТЬ] В соответствии с запросом, это исполняемый скрипт, который демонстрирует проблему. Вы можете профилировать его любыми способами, но когда я это сделал, я обнаружил, что львы часть времени была занята разговорами о приобретении, а не о значении или ооо, как я бы ожидали.

#!/usr/bin/python
import numpy
import multiprocessing

def main():
    fake_images = numpy.random.randint(0,2**14,(630,480,640))
    chunk_avg(fake_images)

def chunk_avg_map(chunk):
    #do the processing
    sig_avg = numpy.mean(chunk, axis=0)
    sig_std = numpy.std(chunk, axis=0)
    return([sig_avg, sig_std])

def chunk_avg(img_data):

    #take each row of the image
    chunks = [img_data[:,i,:] for i in range(len(img_data[0]))]

    pool = multiprocessing.Pool()
    result = pool.map(chunk_avg_map, chunks)
    pool.close()
    pool.join()
    return result

if __name__ == "__main__":
    main()

1 Ответ

7 голосов
/ 23 сентября 2010

Я считаю, что проблема в том, что количество процессорного времени, которое требуется для обработки каждого куска, невелико по сравнению с количеством времени, которое требуется для копирования ввода и вывода в и из рабочих процессов.Я изменил ваш пример кода, чтобы разделить вывод на 16 четных порций и распечатать разницу во времени процессора (time.clock()) между началом и окончанием цикла chunk_avg_map().В моей системе каждый отдельный прогон занимал чуть меньше секунды процессорного времени, но общее использование процессорного времени для группы процессов (система + пользовательское время) составляло более 38 секунд.Очевидные накладные расходы на копирование в 0,75 секунды на фрагмент позволяют вашей программе выполнять вычисления лишь немного быстрее, чем multiprocessing может доставить данные, в результате чего одновременно используются только два рабочих процесса.

Если я изменю код так, чтобы«входные данные» равны xrange(16) и строят случайный массив в пределах chunk_avg_map(), затем я вижу, что время sysem + user падает примерно до 19 секунд, и все 16 рабочих процессов выполняются одновременно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...