Используйте керасы в многопроцессорной обработке - PullRequest
0 голосов
/ 11 мая 2018

Это в основном дубликат: Keras + Tensorflow и Multiprocessing в Python Но мои настройки немного отличаются, и их решение не работает для меня.

Мне нужнообучить модель keras против предсказаний, сделанных из другой модели.Предсказания связаны с некоторым загруженным кодом ЦП, поэтому я хотел бы распараллелить их и запустить код в рабочих процессах.Вот код, который я хотел бы выполнить:

import numpy as np

from keras.layers import Input, Dense
from keras.models import Model
from keras.optimizers import Adam

def create_model():
    input_layer = Input((10,))
    dense = Dense(10)(input_layer)

    return Model(inputs=input_layer, outputs=dense)

model_outside = create_model()
model_outside.compile(Adam(1e-3), "mse")

def subprocess_routine(weights):
    model_inside = create_model()
    model_inside.set_weights(weights)

    while True:
        # lots of CPU
        batch = np.random.rand(10, 10)
        prediction = model_inside.predict(batch)

        yield batch, prediction

weights = model_outside.get_weights()

model_outside.fit_generator(subprocess_routine(weights),
                            epochs=10,
                            steps_per_epoch=100,
                            use_multiprocessing=True,
                            workers=1)

Это приводит к ошибке

E tennflowflow / core / grappler / clusters / utils.cc: 81] Не удалось получитьсвойства устройства, код ошибки: 3

Я нашел вышеупомянутый вопрос, ответ - переместить импорт keras в подпроцесс.Я добавил весь импорт в subprocess_routine.Но это не меняет ошибку.Вероятно, было бы необходимо полностью исключить импорт keras из основного процесса, но в моей настройке это означало бы огромные рефакторинги.

Keras + многопоточность, кажется, работает.В этом выпуске прокрутите вниз до самого последнего комментария: https://github.com/keras-team/keras/issues/5640 В моем коде это выглядит так:

model_inside = create_model()
model_inside._make_predict_function()

graph = tf.get_default_graph()

def subprocess_routine(model_inside, graph):

    while True:
        batch = np.random.rand(10, 10)

        with graph.as_default():
            prediction = model_inside.predict(batch)

        yield batch, prediction

model_outside.fit_generator(subprocess_routine(model_inside, graph),
                            epochs=10,
                            steps_per_epoch=100,
                            use_multiprocessing=True,
                            workers=1)

Но сообщение об ошибке идентично.

Так какпроблема, по-видимому, связана с инициализацией подпроцессов, я пытался создать новый сеанс в каждом подпроцессе:

def subprocess_routine(weights):

    import keras.backend as K
    import tensorflow as tf
    sess = tf.Session()
    K.set_session(sess)

    model_inside = create_model()
    model_inside.set_weights(weights)

    while True:
        batch = np.random.rand(10, 10)
        prediction = model_inside.predict(batch)

        yield batch, prediction

Это приводит к изменению того же сообщения об ошибке:

E tenorflow / stream_executor / cuda / cuda_driver.cc: 1300] не удалось получить число устройств CUDA: CUDA_ERROR_NOT_INITIALIZED

Итак, опять инициализация кажется неработоспособной.

Как я могу запустить keras обав моем основном процессе и подпроцессах, порожденных многопроцессорностью?

Ответы [ 2 ]

0 голосов
/ 13 апреля 2019

Этот метод не работает для меня.

Я загружаю свою сохраненную модель и передаю ее в качестве аргумента.Мое сообщение об ошибке немного отличается от сообщения.Это

E tensorflow/core/grappler/clusters/utils.cc:83] Failed to get device properties, error code: 3

У меня нет проблем с запуском вне многопроцессорной обработки.Кроме того, если это что-то значит, я использую изображение докера tenorflow / tenorflow-gpu-py3 версия 1.13.1

Ниже приведен мой код обнаружения объектов, который берет изображение и производит несколько масштабов этого изображения (называется пирамида изображения).Затем он обрабатывает одну шкалу за раз.Для каждого масштаба он разбирает изображение на меньшие окна и затем отправляет каждое окно процессору.Затем процессор использует model.evaluate([window],[1]), чтобы проверить, содержит ли текущее окно мой объект.Если вероятность высока, информация о окне будет сохранена в очереди и получена позднее (вместе со значениями из других процессов)

Вот мой код:

def start_detection_mp3(image,winDim, minSize,  winStep=4, pyramidScale=1.5, minProb=0.7):
    # Code to use multiple processors (mp)
    boxes=[]
    probs=[]
    print("Loading CNN Keras Model .... ")
    checkpoint_path="trainedmodels/cp.ckpt"
    mymodel=create_CNN_model(2,winDim[0],winDim[1])
    mymodel.load_weights(checkpoint_path)
    mymodel._make_predict_function()
    (keepscale,keeplayer)=CalculateNumberOfScales(image,pyramidScale,minSize)
    printinfo("There are {} scales in this image.".format(len(keepscale)))
    for i in range(0,len(keepscale)):
        printinfo("Working on layer {0:4d}. Scale {1:.2f}".format(i,keepscale[i]))
        (b,p)=detect_single_layer_mp3(keeplayer[i],keepscale[i],winStep,winDim,minProb,mysess,mymodel)

        boxes =boxes + b
        probs =probs + p
    mysess.close()
    return(boxes,probs)

def detect_single_layer_mp3(layer,scale,winStep,winDim,minProb,mysess,mymodel): 
    # Use multiple processors
    q=[]
    p=[]
    d=[]
    i=0
    boxes=[]
    probs=[]
    xx, yy, windows= sliding_window_return(layer, winStep, winDim)
    # process in chunks of 4 (for four processors)
    NumOfProcessors=4;
    for aa in range(0,len(xx)-1,4):
        for ii in range(0,NumOfProcessors):
            ##print("aa: {}  ii: {}".format(aa,ii))
            printinfo("Processes {} of Loop {}".format(ii,aa))
            x=xx[aa]
            y=yy[aa]
            window=windows[aa]
            q=Queue() # Only need to create one Queue (FIFO buffer) to hold output from each process
            # when all processes are completed, the buffer will be emptied.
            p.append(Process(target=f2,args=(x,y,window,scale, minProb,winDim,q,mysess,mymodel)))
            pp=p[-1] # get last
            printinfo("Starting process {}".format(pp))
            pp.start()
            pp.join()

        while not q.empty():
            d=q.get()
            boxes = boxes + d[0]
            probs = probs + d[1]

        p=[]  # Clear Processes    
        p=[]
        q=[]   

    return(boxes,probs)


def f2(x,y,window,scale,minProb,winDim,q,mysess,mymodel):
    processID = os.getpid()
    boxes=[]
    probs=[]
    isHOG = 0
    isCNN = 0
    isCNN_Keras=1
    (winH, winW) = window.shape[:2]
    if winW == winDim[0] and winH ==winDim[1]: # Check that window dimension is 
        if isCNN_Keras ==1:
            ### TODO  It appears that it is freezing at the prediction step                     
            printinfo("Process id: {} Starting test against CNN model".format(processID))
            window=window.reshape(-1,winH,winW,1)
            loss,prob = mymodel.evaluate([window],[1])
            print("Loss: {}  Accuracy: {}".format(loss,prob))

            if prob > minProb:
                printinfo("*** [INFO] ProcessID: {0:7d} Probability: {1:.3f}  Scale {2:.3f} ***".format(processID,prob,scale))
                # compute the (x, y)-coordinates of the bounding box using the current
                # scale of the image pyramid
                (startX, startY) = (int(scale * x), int(scale * y))
                endX = int(startX + (scale * winW))
                endY = int(startY + (scale * winH))

                # update the list of bounding boxes and probabilities
                boxes.append((startX, startY, endX, endY))
                probs.append(prob)      
    # return a tuple of the bounding boxes and probabilities            
    if q!=1:        
        q.put([boxes,probs])
        q.close()
        q=[]
    else:
        return(boxes,probs)
0 голосов
/ 17 июня 2018

Хорошая новость заключается в том, что сеансы тензорного потока являются поточно-ориентированными: Является ли он поточно-ориентированным при использовании tf.Session в службе вывода?

Чтобы использовать модель keras в нескольких процессах, вы должны сделать следующее:

  • настроить модель
  • вызов _make_predict_function()
  • установить сеанс и использовать его для получения графика тензорного потока
  • доработать этот график
  • каждый раз, когда вы что-то предсказываете, поставьте этот график as_default_graph()

Вот пример кода:

# the usual imports
import numpy as np
import tensorflow as tf

from keras.models import *
from keras.layers import *

# set up the model
i = Input(shape=(10,))
b = Dense(1)(i)
model = Model(inputs=i, outputs=b)

# now to use it in multiprocessing, the following is necessary
model._make_predict_function()
sess = tf.Session()
sess.run(tf.global_variables_initializer())
default_graph = tf.get_default_graph()
default_graph.finalize()

# now you share the model and graph between processes
# in each process you can call this:
with default_graph.as_default():
    return model.predict(something)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...