Как прогнозировать несколько изображений в Keras одновременно, используя множественную обработку (например, с разными процессорами)? - PullRequest
1 голос
/ 06 июля 2019

У меня есть много изображений PNG, которые я хочу классифицировать, используя обученную модель CNN.

Чтобы ускорить процесс, я хотел бы использовать множественную обработку с ЦП (у меня есть 72 доступных,здесь я просто использую 4).В данный момент у меня нет доступного графического процессора, но при необходимости я могу получить его.

Мой рабочий процесс:

  1. читатьрисунок с openCV

  2. адаптировать форму и формат

  3. использовать mymodel.predict(img), чтобы получить вероятность для каждого класса

Когда дело доходит до шага прогнозирования, он никогда не завершает шаг mymodel.predict(img).Когда я использую код без многопроцессорного модуля, он работает нормально.Для этой модели я использую кераты с бэкэндом тензорного потока.

# load model
mymodel = load_model('190704_1_fcs_plotclassifier.h5')

# use python library multiprocessing to use different CPUs
import multiprocessing as mp

pool = mp.Pool(4)

# Define callback function to collect the output in 'outcomes'
outcomes = []

def collect_result(result):
    global outcomes
    outcomes.append(result)

# Define prediction function

def prediction(img):
    img = cv2.resize(img,(49,49))
    img = img.astype('float32') / 255
    img = np.reshape(img,[1,49,49,3])       

    status = mymodel.predict(img)
    status = status[0][1]

    return(status)

# Define evaluate function

def evaluate(i,figure):

    # predict the propability of the picture to be in class 0 or 1
    img = cv2.imread(figure)
    status = prediction(img)

    outcome = [figure, status]
    return(i,outcome)

# execute multiprocessing
for i, item in enumerate(listoffigurepaths):
        pool.apply_async(evaluate, args=(i, item), callback=collect_result)
pool.close()
pool.join()

# get outcome
print(outcomes)

Было бы здорово, если бы кто-то знал, как предсказывать несколько изображений одновременно!

Я упростил свой код здесь, но если у кого-нибудь есть пример, как это можно сделать, я был бы очень признателен.

Ответы [ 2 ]

0 голосов
/ 06 июля 2019

Имеет ли значение скорость обработки
или объем ОЗУ
или количество ядер ЦП
или задержка дополнительной обработки?
ВСЕ ЭТО ДЕЛАЮТ:

Модуль python multiprocessing известен (и joblib делает то же самое):

Пакет multiprocessing предлагает как локальный, так и удаленный параллелизм, эффективно обходя блокировку Global Interpreter Lock , используя подпроцессы вместо потоков.

Тем не менее, как и все в нашей Вселенной, это происходит по цене:

Желание, выраженное O / P как:

Чтобы ускорить процесс, я бы хотел использовать множественную обработку с процессорами (у меня есть 72 в наличии

будет, для такого рода аналогичного приложения предварительно обученного mymodel.predict() -или или, если отправлено в Pool( 72 ) -исполнение, почти наверняка задушит почти любую аппаратную оперативную память путем замены.

Вот пример, в котором директива n_jobs = 100 порождает "just" -Do-Nothing работника - чтобы увидеть, что происходит (по времени ~ 532+ [мс] lost + memory- в случае выделения, когда X / Z [ГБ] или RAM были немедленно выделены O / S):

enter image description here

Это происходит из-за того факта, что каждый multiprocessing порожденный подпроцесс (не потоки, как O / P уже испытал сам по себе) сначала создается (после адекватной задержки надстройки) благодаря O / S-процессу / управлению распределением ОЗУ) как --- FULL-COPY --- экосистемы, присутствующей в исходном процессе python (полный python интерпретатор + все его import -данные модули + все его внутреннее состояние и структуры данных - используются или нет - так что действительно происходит огромное количество ОЗУ (заметили ли вы, что платформа начала SWAP - обратите внимание, сколько подпроцессов было порождено до того времени, и у вас есть предел того, сколько таких может поместиться в оперативной памяти, и это приводит к разрушительным эффектам производительности, если вы пытаетесь (или позволяете, используя joblib -s n_jobs = -1 директива автоматического масштабирования), чтобы заполнить больше подпроцессов, чем этот вводящий SWAP номер ...

Пока что хорошо, мы заплатили некоторое (часто за тщательно разработанный код, разумно ничтожное количество, если сравнивать с полным повторным обучением всего предиктора, не так ли?) Время, чтобы порождать некоторое количество параллельных процессов.

Если распределенная рабочая нагрузка затем возвращается, к одному общему ресурсоемкому ресурсу (дисковому каталогу с файлами), производительность параллельных процессов падает, но в аварийном состоянии - она ​​должна ждать такого ресурса (!) чтобы сначала получить его снова бесплатно.

Наконец, даже «правильное» количество подпроцессов, порожденных Pool(), таких, что не позволяет оператору O / S начать замену оперативной памяти на диск и обратно, межпроцессное взаимодействие очень дорого - здесь, сериализация (Pickling / unPickling) + enQueueing + deQueueing всех DATA-объектов, нужно пройти туда и обратно (да, даже для callback веселья), так что меньше отправляет, тем быстрее станет Pool -обработка.

Здесь все процессы, связанные с Pool, могут выиграть от независимой регистрации результатов, что может уменьшить как масштабы, так и задержку межпроцессного взаимодействия, но также консолидировать результаты, сообщенные любым числом работников, в общий журнал.


Как ...? Сначала сравните стоимость каждого шага:

Без достоверных фактов (измеренная длительность в [us]) остается только мнение.

def prediction( img ):
    img = cv2.resize( img, ( 49, 49 ) ) 
    img = img.astype( 'float32' ) / 255
    img = np.reshape( img, [1, 49, 49, 3] )       

    status = mymodel.predict( img )
    status = status[0][1]

    return( status )

def evaluate( i, figure ):  # predict the propability of the picture to be in class 0 or 1
    img = cv2.imread( figure )
    status = prediction( img )

    outcome = [figure, status]

    return( i, outcome )
#--------------------------------------------------
from zmq import Stopwatch
aClk = Stopwatch()
#------------------------------------NOW THE COSTS OF ORIGINAL VERSION:
aListOfRESULTs = []
for iii in range( 100 ):
    #-------------------------------------------------aClk-ed---------- SECTION
    aClk.start(); _ = evaluate( 1, aFigureNAME ); A = aClk.stop()
    #-------------------------------------------------aClk-ed---------- SECTION
    print( "as-is took {0:}[us]".format( A ) );aListOfRESULTs.append( A )

#----------------------------------------------------------------------
print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------

Давайте попробуем что-нибудь еще:

def eval_w_RAM_allocs_avoided( indexI, aFigureNAME ):
    return [ indexI,
             [ aFigureNAME,
               mymodel.predict( ( cv2.resize( cv2.imread( aFigureNAME ),
                                              ( 49, 49 )
                                              ).astype( 'float32' ) / 255
                                  ).reshape( [1, 49, 49, 3]
                                             )
                                )[0][1],
               ],
             ]

#------------------------------------NOW THE COSTS OF MOD-ed VERSION:
aListOfRESULTs = []
for iii in range( 100 ):
    #-------------------------------------------------aClk-ed---------- SECTION
    aClk.start()
    _ = eval_w_RAM_allocs_avoided( 1, aFigureNAME )
    B = aClk.stop()
    #-------------------------------------------------aClk-ed---------- SECTION
    print( "MOD-ed took {0:}[us] ~ {1:} x".format( B, float( B ) / A ) )
    aListOfRESULTs.append( B )
#----------------------------------------------------------------------
print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------

А фактические img накладные расходы конвейера предварительной обработки:

#------------------------------------NOW THE COSTS OF THE IMG-PREPROCESSING
aListOfRESULTs = []
for iii in range( 100 ):
    #-------------------------------------------------aClk-ed---------- SECTION
    aClk.start()
    aPredictorSpecificFormatIMAGE = ( cv2.resize( cv2.imread( aFigureNAME ),
                                                  ( 49, 49 )
                                                  ).astype( 'float32' ) / 255
                                      ).reshape( [1, 49, 49, 3]
                                                 )
    C = aClk.stop()
    #-------------------------------------------------aClk-ed---------- SECTION
    print( "IMG setup took {0:}[us] ~ {1:} of A".format( C, float( C ) / A ) )
    aListOfRESULTs.append( C )

#----------------------------------------------------------------------
print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------

Фактические операции ввода / вывода:

#------------------------------------NOW THE COSTS OF THE IMG-FILE-I/O-READ
aListOfRESULTs = []
for iii in range( 100 ):
    #-------------------------------------------------aClk-ed---------- SECTION
    aFileNAME = listoffigurepaths[158 + iii * 172]
    aClk.start()
    _ = cv2.imread( aFileNAME )
    F = aClk.stop()
    #-------------------------------------------------aClk-ed---------- SECTION
    print( "aFileIO took {0:}[us] ~ {1:} of A".format( F, float( F ) / A ) )
    aListOfRESULTs.append( F )

#----------------------------------------------------------------------
print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------

Без этих неопровержимых фактов (в виде количественной записи доказательств) вряд ли можно было бы решить, какой будет наилучший шаг повышения производительности для обработки данных в масштабном режиме прогнозирования.

Проверив эти элементы, опубликуйте результаты и дальнейшие шаги (будь то переход через multiprocessing.Pool или использование другой стратегии для увеличения производительности до любой более высокой производительности) можно сначала получить разумную оценку, так какДля этого были собраны серьезные факты.

0 голосов
/ 06 июля 2019

Я знаю, что вам может помочь один пакет python joblib.Надеюсь, что это может решить вашу проблему.

from joblib import Parallel, delayed
# load model
mymodel = load_model('190704_1_fcs_plotclassifier.h5')

# Define callback function to collect the output in 'outcomes'
outcomes = []

def collect_result(result):
    global outcomes
    outcomes.append(result)

# Define prediction function

def prediction(img):
    img = cv2.resize(img,(49,49))
    img = img.astype('float32') / 255
    img = np.reshape(img,[1,49,49,3])       

    status = mymodel.predict(img)
    status = status[0][1]

    return(status)

# Define evaluate function

def evaluate(i,figure):

    # predict the propability of the picture to be in class 0 or 1
    img = cv2.imread(figure)
    status = prediction(img)

    outcome = [figure, status]
    return(i,outcome)

outcomes = Parallel(n_jobs=72)(delayed(evaluate)(i,figure) for figure in listoffigurepaths)
...