Распараллеливание предсказаний модели в кератах с использованием многопроцессорной обработки для python - PullRequest
0 голосов
/ 28 марта 2020

Я пытаюсь выполнить прогнозирование модели параллельно, используя команду model.predict, предоставленную keras в python2. Я использую тензор потока 1.14.0 для python2. У меня есть 5 файлов модели (.h5), и я хотел бы, чтобы команда предсказания выполнялась параллельно. Это выполняется в python 2.7. Я использую многопроцессорный пул для сопоставления имен файлов модели с помощью функции прогнозирования для нескольких процессов, как показано ниже,

import matplotlib as plt
import numpy as np
import cv2
from multiprocessing import Pool
pool=Pool()
def prediction(model_name):
    global input
    from tensorflow.keras.models import load_model
    model=load_model(model_name)
    ret_val=model.predict(input).tolist()[0]
    return ret_val

models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
start_time=time.time()
res=pool.map(prediction,models)
print('Total time taken: {}'.format(time.time() - start_time))
print(res)

Входные данные - это массив изображений numpy, полученный из другой части кода. Но при выполнении этого я получаю следующее:

Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
  File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
    self.run()
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
    return recv()
AttributeError: 'module' object has no attribute 'prediction'
AttributeError: 'module' object has no attribute 'prediction'

Я не могу интерпретировать это сообщение об ошибке, и как мне go решить эту проблему? Любой совет очень ценится!

ОБНОВЛЕНИЕ 2: Спасибо за все ссылки и полный пример @sokato. Я выполнил точный код, отправленный @sokato, однако я получил следующую ошибку (я тоже внес изменения в свой код и получил ту же ошибку, показанную ниже),

Traceback (most recent call last):
  File "stackoverflow.py", line 47, in <module>
    with multiprocessing.Pool() as p:
AttributeError: __exit__

ОБНОВЛЕНИЕ3: * 1015 Спасибо за поддержку. Я думаю, что проблема в UPDATE2 была вызвана использованием python2 вместо python3. Я смог решить ошибку, указанную в UPDATE2 для python2, используя with closing(multiprocessing.Pool()) as p: вместо просто with multiprocessing.Pool() as p: в коде @ sokato. Импортируйте функцию закрытия следующим образом: from contextlib import closing

НОВЫЙ ПРОБЛЕМА С ИСПОЛЬЗОВАНИЕМ ПОДХОДА, ПОКАЗАННОГО НИЖЕ,

У меня фактически есть несколько входных данных. Вместо загрузки каждой модели Время для каждого ввода Я хочу загрузить все модели заранее и сохранить его в списке. Я сделал это, как показано ниже,

import matplotlib as plt
import numpy as np
import cv2
import multiprocessing
import tensorflow as tf
from contextlib import closing
import time

models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
loaded_models=[]
for model in models:
    loaded_models.append(tf.keras.models.load_model(model))

def prediction(input_tuple):
    inputs,loaded_models=input_tuple
    predops=[]
    for model in loaded_models:
        predops.append(model.predict(inputs).tolist()[0])
    actops=[]
    for predop in predops:
        actops.append(predop.index(max(predop)))
    max_freqq = max(set(actops), key = actops.count) 
    return max_freqq

#....some pre-processing....#

    '''new_all_t is a list which contains tuples and each tuple has inputs from all_t 
    and the list containing loaded models which will be extracted
 in the prediction function.'''

new_all_t=[]
for elem in all_t:
    new_all_t.append((elem,loaded_models))
start_time=time.time()
with closing(multiprocessing.Pool()) as p:
    predops=p.map(prediction,new_all_t)
print('Total time taken: {}'.format(time.time() - start_time))

new_all_t - это список, который содержит кортежи, и каждый кортеж имеет входные данные из all_t и список, содержащий загруженные модели, которые будут извлечены в функции прогнозирования. Однако я получаю следующая ошибка сейчас,

Traceback (most recent call last):
  File "trial_mult-ips.py", line 240, in <module>
    predops=p.map(prediction,new_all_t)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
NotImplementedError: numpy() is only available when eager execution is enabled.

Что именно это означает? Как мне go решить эту проблему?

UPDATE4: Я включил строки tf.compat.v1.enable_eager_execution() и tf.compat.v1.enable_v2_behavior() в самом начале. Теперь я получаю следующую ошибку:

WARNING:tensorflow:From /home/nick/.local/lib/python2.7/site-packages/tensorflow/python/ops/math_grad.py:1250: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where

Traceback (most recent call last):
  File "the_other_end-mp.py", line 216, in <module>
    predops=p.map(prediction,modelon)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
ValueError: Resource handles are not convertible to numpy.

Я не могу интерпретировать это сообщение об ошибке, и как мне go решить эту проблему? Любой совет очень ценится!

1 Ответ

1 голос
/ 29 марта 2020

Итак, я не уверен в некоторых ваших дизайнерских решениях, но я приложил все усилия для получения данной информации. В частности, я думаю, что, возможно, есть некоторые проблемы с глобальной переменной и оператором импорта в вашей параллельной функции.

  1. Вы должны использовать общие переменные, а не глобальные переменные для совместного использования ввода между процессами. Вы можете прочитать больше о разделяемой памяти, если хотите, в многопроцессорной документации.

  2. Я сгенерировал модели из учебника, поскольку ваши модели не включены.

  3. Вы не присоединяетесь или не закрываете свой пул, но с помощью следующего кода я смог получить код для параллельного выполнения. Вы можете закрыть пул, вызвав pool.close() или используя синтаксис «с», показанный ниже. Обратите внимание, что синтаксис with не применяется к python 2.7.

import numpy as np
import multiprocessing, time, ctypes, os
import tensorflow as tf

mis = (28, 28) #model input shape
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

def createModels(models):
    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=mis),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10)
    ])

    model.compile(optimizer='adam',
               loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
               metrics=['accuracy'])

    model.fit(x_train, y_train, epochs=5)

    for mod in models:
        model.save(mod)

def prediction(model_name):

    model=tf.keras.models.load_model(model_name)
    ret_val=model.predict(input).tolist()[0]
    return ret_val

if __name__ == "__main__":
    models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
    dir = os.listdir(".")
    if models[0] not in dir:
        createModels(models)
    # Shared array input
    ub = 100
    testShape = x_train[:ub].shape
    input_base = multiprocessing.Array(ctypes.c_double, 
    int(np.prod(testShape)),lock=False)
    input = np.ctypeslib.as_array(input_base)
    input = input.reshape(testShape)
    input[:ub] = x_train[:ub]

    # with multiprocessing.Pool() as p:  #Use me for python 3
    p = multiprocessing.Pool() #Use me for python 2.7
    start_time=time.time()
    res=p.map(prediction,models)
    p.close() #Use me for python 2.7
    print('Total time taken: {}'.format(time.time() - start_time))
    print(res)

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...