Я пытаюсь обучить несколько CNN на разных графических процессорах, например, по одной сети на каждый графический процессор.Поскольку я хочу, чтобы все сети работали одновременно, я использую многопроцессорную обработку для одновременного начала обучения.
Я смог запустить его с приведенными ниже кодами, однакоповедение программы не является детерминированным.Иногда он выдает ошибки 'Невозможно выбрать SwigpyObject' или 'Невозможно выбрать объект _thread.rlock' .Кроме того, иногда он застревает в queue.get () или self.map_async (func, iterable, chunksize) .get () .Я посмотрел и нашел людей, предлагающих использовать пафос .Я пытался это сделать, но даже тогда поведение не повторяется.
Я использую API-интерфейс набора данных и оценки tenorflow.Кроме того, я использую Queue и Dict из диспетчера многопроцессорности (также пробовал многопроцессорность), чтобы отслеживать доступные графические процессоры, и они распределяются между процессами.
Это мой код CNN
import numpy as np
import os
import tensorflow as tf
import sys
class DNN():
def __init__(self,epochs = 5, batch_size = 64, learning_rate = 0.001, data_dir = None, verbose = False):
self._epochs = epochs
self._batch_size = batch_size
self._learning_rate = learning_rate
self._data_dir = data_dir
if verbose:
tf.logging.set_verbosity(tf.logging.INFO)
else:
tf.logging.set_verbosity(tf.logging.ERROR)
self._image_shape = [300,300,3]
def _build_net(self,inp):
with tf.name_scope('net'):
x = tf.layers.conv2d(inp, 3, 8, padding = 'same', name = 'conv1')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool1')
x = tf.layers.conv2d(x, 5, 8, padding = 'same', name = 'conv2')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool2')
x = tf.layers.conv2d(x, 5, 16, padding = 'same', name = 'conv3')
x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool3')
x = tf.layers.flatten(x)
x = tf.layers.dense(x, 50, name = 'dense1')
x = tf.layers.dense(x, self.number_of_classes, name='output')
return x
def _loss_fn(self,logits,labels,mode):
if mode == tf.estimator.ModeKeys.TRAIN:
loss_name = 'train_loss'
elif mode == tf.estimator.ModeKeys.EVAL:
loss_name = 'eval_loss'
with tf.name_scope(loss_name):
loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(labels,logits))
return loss
def _opt_fn(self,loss):
with tf.name_scope('optimizer'):
opt = tf.train.AdamOptimizer(learning_rate=self._learning_rate).minimize(loss,global_step=tf.train.get_global_step())
return opt
def _acc(self,predictions,labels):
with tf.name_scope('acc'):
acc = tf.reduce_mean(tf.cast(tf.equal(predictions, tf.argmax(labels,-1)), tf.float32))
return acc
def _inp_fn(self,dataset,mode):
def input_parser(img_path, label):
# convert the label to one-hot encoding
one_hot = tf.one_hot(label, self.number_of_classes)
one_hot = tf.reshape(one_hot,[self.number_of_classes])
# read the img from file
img_file = tf.read_file(img_path)
img_decoded = tf.image.decode_jpeg(img_file, channels=self._image_shape[-1])
img_decoded = tf.image.resize_images(img_decoded,self._image_shape[:2])
img_decoded = tf.cast(img_decoded, tf.float32)
img_decoded = tf.reshape(img_decoded,self._image_shape)
return img_decoded, one_hot
dataset = tf.data.Dataset.from_tensor_slices(dataset)
if mode == tf.estimator.ModeKeys.TRAIN:
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = self._epochs, seed = tf.set_random_seed(123)))
dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
dataset = dataset.prefetch(1)
iterator = tf.data.Iterator.from_structure(dataset.output_types,dataset.output_shapes)
elif mode == tf.estimator.ModeKeys.PREDICT or tf.estimator.ModeKeys.EVAL:
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = 1, seed = tf.set_random_seed(123)))
dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
dataset = dataset.prefetch(1)
iterator = dataset.make_one_shot_iterator()
return iterator.get_next()
def _model_fn(self,features,labels,mode,params):
logits = self._build_net(features)
loss = self._loss_fn(logits,labels,mode)
predictions = {"classes": tf.argmax(input = tf.nn.softmax(logits),axis = -1),
"probabilities": tf.nn.softmax(logits,name = "softmax")
}
acc_op = self._acc(predictions = predictions["classes"], labels = labels)
if mode == tf.estimator.ModeKeys.TRAIN:
opt = self._opt_fn(loss)
tf.summary.scalar('train_accuracy', acc_op)
return tf.estimator.EstimatorSpec(mode = mode, loss = loss, train_op = opt)
elif mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(mode = mode, predictions = predictions)
elif mode == tf.estimator.ModeKeys.EVAL:
eval_acc = tf.metrics.accuracy(labels=tf.argmax(labels,-1),predictions=predictions["classes"])
eval_metric_ops = {"val_accuracy": eval_acc}
tf.summary.scalar('val_accuracy',acc_op)
return tf.estimator.EstimatorSpec(mode = mode, loss=loss, eval_metric_ops=eval_metric_ops)
def train(self, train_data, val_data, queue, dict):
x_train, y_train = train_data
x_eval, y_eval = val_data
run_config = tf.estimator.RunConfig(model_dir = os.path.join('logs','model_basic_cnn'), tf_random_seed = tf.set_random_seed(123), keep_checkpoint_max = 1, save_checkpoints_steps = 1000, save_checkpoints_secs = None)
self.clf = tf.estimator.Estimator(model_fn = self._model_fn, config=run_config)
if not os.path.isdir(self.clf.eval_dir()):
os.makedirs(self.clf.eval_dir())
gpu_id = queue.get()
os.environ('CUDA_VISIBLE_DEVICES') = gpu_id
with tf.device('\gpu:'+ gpu_id):
out = tf.estimator.train_and_evaluate(
self.clf,
train_spec = tf.estimator.TrainSpec(input_fn = lambda: self._inp_fn((x_train,y_train),mode=tf.estimator.ModeKeys.TRAIN)),
eval_spec = tf.estimator.EvalSpec(input_fn = lambda: self._inp_fn((x_eval,y_eval),mode=tf.estimator.ModeKeys.EVAL))
)
print(f"Validation accuracy: {out[0]['accuracy']:.4f}")
queue.put(gpu_id)
return out[0]['accuracy']
код для вызова многопроцессорной обработки выглядит следующим образом:
#from multiprocessing import Pool, Manager
from pathos.multiprocessing import ProcessPool as Pool
from multiprocess import Manager
import dill
def train_pp(dnn, dict, man, pool):
queue = man.Queue(dnn.NUM_GPUS)
[queue.put(i) for i in range(dnn.NUM_GPUS)]
all_acc = pool.map(dnn.train,[[(X_train,y_train),(X_eval,y_eval),queue,dict] for args])
#queue.join()
pool.close()
pool.join()
pool.clear()
pool.restart()
return all_acc, dict
if __name__ == "__main__":
dnn = DNN()
pool = Pool(processes=dnn.NUM_GPUS)
man = Manager()
dict = man.dict()
net_acc, dict = train_pp(dnn,dict,man,pool)
Каков наилучший способ одновременного обучения нескольких сетей?В чем проблема с многопроцессорной обработкой, которая показывает случайное поведение и как ее можно решить?
PS: дополнительный, не по теме вопрос: как должен осуществляться обмен данными при обучении всех сетей?по тем же данным?В настоящее время я считаю, что многопроцессорность создает несколько копий кода DNN, поэтому у каждого графического процессора есть свой итератор.Это лучший способ?Как получить данные о процессоре и обмениваться ими со всеми процессами без блокировок?