Как воспользоваться преимуществами многопроцессорности и многопоточности в процессе глубокого обучения с использованием Keras? - PullRequest
6 голосов
/ 28 мая 2019

Я бы предположил, что большинство фреймворков, таких как keras / tenorflow / ..., автоматически используют все ядра процессора, но на практике кажется, что это не так. Я просто смог найти несколько источников, которые могут привести нас к использованию всей мощности ЦП в процессе глубокого обучения. Я нашел статью , которая написана об использовании

from multiprocessing import Pool 
import psutil
import ray 

с другой стороны, на основе этого ответа за использование модели keras в нескольких процессах вышеупомянутые библиотеки не отслеживаются. Есть ли более элегантный способ воспользоваться Multiprocessing для Keras, поскольку он очень популярен для реализации.

  • Например, как можно изменить следующую простую реализацию RNN для достижения как минимум 50% загрузки ЦП в процессе обучения?

  • Должен ли я использовать 2-ю модель как многозадачность, такую ​​как LSTM, которую я комментирую ниже? Я имею в виду, можем ли мы одновременно управлять несколькими моделями, используя больше ресурсов процессора?

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from keras.layers.normalization import BatchNormalization
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import LSTM,SimpleRNN
from keras.models import Sequential
from keras.optimizers import Adam, RMSprop

df = pd.read_csv("D:\Train.csv", header=None)

index = [i for i in list(range(1440)) if i%3==2]

Y_train= df[index]
df = df.values

#making history by using look-back to prediction next
def create_dataset(dataset,data_train,look_back=1):
    dataX,dataY = [],[]
    print("Len:",len(dataset)-look_back-1)
    for i in range(len(dataset)-look_back-1):
        a = dataset[i:(i+look_back), :]
        dataX.append(a)
        dataY.append(data_train[i + look_back,  :])
    return np.array(dataX), np.array(dataY)

Y_train=np.array(Y_train)
df=np.array(df)

look_back = 10
trainX,trainY = create_dataset(df,Y_train, look_back=look_back)

#Split data into train & test
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)

#Shape of train and test data
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
print("train size: {}".format(trainX.shape))
print("train Label size: {}".format(trainY.shape))
print("test size: {}".format(testX.shape))
print("test Label size: {}".format(testY.shape))
#train size: (23, 10, 1440)
#train Label size: (23, 960)
#test size: (6, 10, 1440)
#test Label size: (6, 960)


model_RNN = Sequential()
model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model_RNN.add(Dense(960))
model_RNN.add(BatchNormalization())
model_RNN.add(Activation('tanh'))
# Compile model
model_RNN.compile(loss='mean_squared_error', optimizer='adam')
callbacks = [
    EarlyStopping(patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)]
# Fit the model
hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)


#predict

Y_train=np.array(trainY)
Y_test=np.array(testX)

Y_RNN_Train_pred=model_RNN.predict(trainX)
Y_RNN_Test_pred=model_RNN.predict(testX)

train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred)
test_MSE=mean_squared_error(testY, Y_RNN_Test_pred)

# create and fit the Simple LSTM model as 2nd model for multi-tasking

#model_LSTM = Sequential()
#model_LSTM.add(LSTM(units = 1440, input_shape=(trainX.shape[1], trainX.shape[2])))
#model_LSTM.add(Dense(units = 960))
#model_LSTM.add(BatchNormalization())
#model_LSTM.add(Activation('tanh'))
#model_LSTM.compile(loss='mean_squared_error', optimizer='adam')
#hist_LSTM=model_LSTM.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)

#Y_train=np.array(trainY)
#Y_test=np.array(testX)

#Y_LSTM_Train_pred=model_LSTM.predict(trainX)
#Y_LSTM_Test_pred=model_LSTM.predict(testX)

#train_MSE=mean_squared_error(trainY, Y_LSTM_Train_pred)
#test_MSE=mean_squared_error(testY, Y_LSTM_Test_pred)

#plot losses for RNN + LSTM
f, ax = plt.subplots(figsize=(20, 15))
    plt.subplot(1, 2, 1)
    ax=plt.plot(hist_RNN.history['loss']    ,label='Train loss')
    ax=plt.plot(hist_RNN.history['val_loss'],label='Test/Validation/Prediction loss')
    plt.xlabel('Training steps (Epochs = 50)')
    plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
    plt.title(' RNN Loss on Train and Test data')
    plt.legend()
    plt.subplot(1, 2, 2)
    ax=plt.plot(hist_LSTM.history['loss']    ,label='Train loss')
    ax=plt.plot(hist_LSTM.history['val_loss'],label='Test/Validation/Prediction loss')
    plt.xlabel('Training steps (Epochs = 50)')
    plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
    plt.title('LSTM Loss on Train and Test data')
    plt.legend()

    plt.subplots_adjust(top=0.80, bottom=0.38, left=0.12, right=0.90, hspace=0.37, wspace=0.28)
    #plt.savefig('All_Losses_history_.png')
    plt.show()

Примечание У меня нет доступа к CUDA , я просто получаю доступ к мощному серверу без VGA. Моя цель - использовать многопроцессорность и многопоточность для максимальной загрузки ЦП вместо 30%, это означает, что у меня только одно ядро, а у меня четырехъядерный! Любой совет будет принята с благодарностью. Я загрузил отформатированный csv набор данных.

Обновление: Моя конфигурация HW следующая:

  • Процессор: AMD A8-7650K Radeon R7 10 вычислительных ядер 4C + 6G 3,30 ГГц
  • RAM: 16 ГБ
  • ОС: Win 7
  • Python ver 3.6.6
  • Tensorflow ver 1.8.0
  • Keras ver 2.2.4

1 Ответ

8 голосов
/ 04 июня 2019

Хорошо, что тренировка одной модели не использует все 100% вашего процессора! Теперь у нас есть пространство для параллельного обучения нескольких моделей и ускорения общего времени обучения.

Примечание: если вы хотите просто ускорить эту модель, посмотрите на графические процессоры или измените гиперпараметры, такие как размер пакета и количество нейронов (размер слоя).

Вот как вы можете использовать multiprocessing для одновременного обучения нескольких моделей (используя процессы, выполняющиеся параллельно на каждом отдельном ядре ЦП вашей машины).

multiprocessing.Pool в основном создает пул работ, которые нужно выполнять. Процессы подберут эти задания и запустят их. Когда работа завершена, процесс выберет другую работу из пула.

import time
import signal
import multiprocessing

def init_worker():
    ''' Add KeyboardInterrupt exception to mutliprocessing workers '''
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(layer_size):
    '''
    This code is parallelised and runs on each process
    It trains a model with different layer sizes (hyperparameters)
    It saves the model and returns the score (error)
    '''
    import keras
    from keras.models import Sequential
    from keras.layers import Dense

    print(f'Training a model with layer size {layer_size}')

    # build your model here
    model_RNN = Sequential()
    model_RNN.add(Dense(layer_size))

    # fit the model (the bit that takes time!)
    model_RNN.fit(...)

    # lets demonstrate with a sleep timer
    time.sleep(5)

    # save trained model to a file
    model_RNN.save(...)

    # you can also return values eg. the eval score
    return model_RNN.evaluate(...)


num_workers = 4
hyperparams = [800, 960, 1100]

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, hyperparams)

print(scores)

Выход:

Training a model with layer size 800
Training a model with layer size 960
Training a model with layer size 1100
[{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]

Это легко продемонстрировать с помощью time.sleep в коде. Вы увидите, что все 3 процесса начинают учебную работу, а затем все они заканчиваются примерно в одно и то же время. Если бы это было обработано один раз, вам пришлось бы ждать завершения каждого из них, прежде чем начинать следующее (зевать!).

EDIT ОП также хотел полный код. Это сложно при переполнении стека, потому что я не могу тестировать в вашей среде и с вашим кодом. Я взял на себя смелость скопировать и вставить ваш код в мой шаблон выше. Возможно, вам придется добавить некоторые импортные данные, но это настолько близко, насколько вы доберетесь до «запускаемого» и «полного» кода.

import time
import signal
import numpy as np
import pandas as pd
import multiprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score


def init_worker():
    ''' Add KeyboardInterrupt exception to mutliprocessing workers '''
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(model_type):
    '''
    This code is parallelised and runs on each process
    It trains a model with different layer sizes (hyperparameters)
    It saves the model and returns the score (error)
    '''
    from keras.layers import LSTM, SimpleRNN, Dense, Activation
    from keras.models import Sequential
    from keras.callbacks import EarlyStopping, ReduceLROnPlateau
    from keras.layers.normalization import BatchNormalization

    print(f'Training a model: {model_type}')

    callbacks = [
        EarlyStopping(patience=10, verbose=1),
        ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
    ]

    model = Sequential()

    if model_type == 'rnn':
        model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
    elif model_type == 'lstm':
        model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))

    model.add(Dense(480))
    model.add(BatchNormalization())
    model.add(Activation('tanh'))
    model.compile(loss='mean_squared_error', optimizer='adam')
    model.fit(
        trainX,
        trainY,
        epochs=50,
        batch_size=20,
        validation_data=(testX, testY),
        verbose=1,
        callbacks=callbacks,
    )

    # predict
    Y_Train_pred = model.predict(trainX)
    Y_Test_pred = model.predict(testX)

    train_MSE = mean_squared_error(trainY, Y_Train_pred)
    test_MSE = mean_squared_error(testY, Y_Test_pred)

    # you can also return values eg. the eval score
    return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE}


# Your code
# ---------

df = pd.read_csv("D:\Train.csv", header=None)

index = [i for i in list(range(1440)) if i % 3 == 2]

Y_train = df[index]
df = df.values

# making history by using look-back to prediction next
def create_dataset(dataset, data_train, look_back=1):
    dataX, dataY = [], []
    print("Len:", len(dataset) - look_back - 1)
    for i in range(len(dataset) - look_back - 1):
        a = dataset[i : (i + look_back), :]
        dataX.append(a)
        dataY.append(data_train[i + look_back, :])
    return np.array(dataX), np.array(dataY)


Y_train = np.array(Y_train)
df = np.array(df)

look_back = 10
trainX, trainY = create_dataset(df, Y_train, look_back=look_back)

# Split data into train & test
trainX, testX, trainY, testY = train_test_split(
    trainX, trainY, test_size=0.2, shuffle=False
)

# My Code
# -------

num_workers = 2
model_types = ['rnn', 'lstm']

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, model_types)

print(scores)

Вывод программы:

[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866}, 
 {'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]
...