Обучение нескольких сетей в системе с несколькими графическими процессорами с использованием тензорного потока и многопроцессорной обработки - PullRequest
0 голосов
/ 26 ноября 2018

Я пытаюсь обучить несколько CNN на разных графических процессорах, например, по одной сети на каждый графический процессор.Поскольку я хочу, чтобы все сети работали одновременно, я использую многопроцессорную обработку для одновременного начала обучения.

Я смог запустить его с приведенными ниже кодами, однакоповедение программы не является детерминированным.Иногда он выдает ошибки 'Невозможно выбрать SwigpyObject' или 'Невозможно выбрать объект _thread.rlock' .Кроме того, иногда он застревает в queue.get () или self.map_async (func, iterable, chunksize) .get () .Я посмотрел и нашел людей, предлагающих использовать пафос .Я пытался это сделать, но даже тогда поведение не повторяется.

Я использую API-интерфейс набора данных и оценки tenorflow.Кроме того, я использую Queue и Dict из диспетчера многопроцессорности (также пробовал многопроцессорность), чтобы отслеживать доступные графические процессоры, и они распределяются между процессами.

Это мой код CNN

import numpy as np
import os
import tensorflow as tf
import sys

class DNN():

    def __init__(self,epochs = 5, batch_size = 64, learning_rate = 0.001, data_dir = None, verbose = False):
        self._epochs = epochs
        self._batch_size = batch_size
        self._learning_rate = learning_rate
        self._data_dir = data_dir 

        if verbose:
            tf.logging.set_verbosity(tf.logging.INFO)
        else:
            tf.logging.set_verbosity(tf.logging.ERROR)

        self._image_shape = [300,300,3]

    def _build_net(self,inp):
        with tf.name_scope('net'): 
            x = tf.layers.conv2d(inp, 3, 8, padding = 'same', name = 'conv1')
            x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool1')
            x = tf.layers.conv2d(x, 5, 8, padding = 'same', name = 'conv2')
            x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool2')
            x = tf.layers.conv2d(x, 5, 16, padding = 'same', name = 'conv3')
            x = tf.layers.max_pooling2d(x, 3, strides = 2, name = 'pool3')
            x = tf.layers.flatten(x) 
            x = tf.layers.dense(x, 50, name = 'dense1')
            x = tf.layers.dense(x, self.number_of_classes, name='output')
        return x

    def _loss_fn(self,logits,labels,mode):
        if mode  == tf.estimator.ModeKeys.TRAIN:
            loss_name = 'train_loss'
        elif mode  == tf.estimator.ModeKeys.EVAL:
            loss_name = 'eval_loss'
        with tf.name_scope(loss_name):
            loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(labels,logits))
        return loss

    def _opt_fn(self,loss):
        with tf.name_scope('optimizer'):
            opt = tf.train.AdamOptimizer(learning_rate=self._learning_rate).minimize(loss,global_step=tf.train.get_global_step())
        return opt

    def _acc(self,predictions,labels):
        with tf.name_scope('acc'):
            acc = tf.reduce_mean(tf.cast(tf.equal(predictions, tf.argmax(labels,-1)), tf.float32))
        return acc

    def _inp_fn(self,dataset,mode):

        def input_parser(img_path, label):

            # convert the label to one-hot encoding
            one_hot = tf.one_hot(label, self.number_of_classes)
            one_hot = tf.reshape(one_hot,[self.number_of_classes])

            # read the img from file
            img_file = tf.read_file(img_path)
            img_decoded = tf.image.decode_jpeg(img_file, channels=self._image_shape[-1])
            img_decoded = tf.image.resize_images(img_decoded,self._image_shape[:2])
            img_decoded = tf.cast(img_decoded, tf.float32)
            img_decoded = tf.reshape(img_decoded,self._image_shape)
            return img_decoded, one_hot

        dataset = tf.data.Dataset.from_tensor_slices(dataset)

        if mode  == tf.estimator.ModeKeys.TRAIN:
            dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = self._epochs, seed = tf.set_random_seed(123)))
            dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
            dataset = dataset.prefetch(1)
            iterator = tf.data.Iterator.from_structure(dataset.output_types,dataset.output_shapes)

        elif mode  == tf.estimator.ModeKeys.PREDICT or tf.estimator.ModeKeys.EVAL:
            dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(self._batch_size,count = 1, seed = tf.set_random_seed(123)))
            dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func = input_parser, batch_size = self._batch_size, num_parallel_batches = 4))
            dataset = dataset.prefetch(1)

        iterator = dataset.make_one_shot_iterator()
        return iterator.get_next()

    def _model_fn(self,features,labels,mode,params):
        logits = self._build_net(features)
        loss = self._loss_fn(logits,labels,mode)
        predictions = {"classes": tf.argmax(input = tf.nn.softmax(logits),axis = -1),
                       "probabilities": tf.nn.softmax(logits,name = "softmax")
                        }
        acc_op = self._acc(predictions = predictions["classes"], labels = labels)    
        if mode  == tf.estimator.ModeKeys.TRAIN:
            opt = self._opt_fn(loss)
            tf.summary.scalar('train_accuracy', acc_op)
            return tf.estimator.EstimatorSpec(mode = mode, loss = loss, train_op = opt)

        elif mode  == tf.estimator.ModeKeys.PREDICT:
            return tf.estimator.EstimatorSpec(mode = mode, predictions = predictions)

        elif mode  == tf.estimator.ModeKeys.EVAL:
            eval_acc = tf.metrics.accuracy(labels=tf.argmax(labels,-1),predictions=predictions["classes"])
            eval_metric_ops = {"val_accuracy": eval_acc}
            tf.summary.scalar('val_accuracy',acc_op)
            return tf.estimator.EstimatorSpec(mode = mode, loss=loss, eval_metric_ops=eval_metric_ops)

    def train(self, train_data, val_data, queue, dict):
        x_train, y_train = train_data
        x_eval, y_eval = val_data

        run_config = tf.estimator.RunConfig(model_dir = os.path.join('logs','model_basic_cnn'), tf_random_seed = tf.set_random_seed(123), keep_checkpoint_max = 1, save_checkpoints_steps = 1000, save_checkpoints_secs = None)
        self.clf = tf.estimator.Estimator(model_fn = self._model_fn, config=run_config)

        if not os.path.isdir(self.clf.eval_dir()):
            os.makedirs(self.clf.eval_dir())

        gpu_id = queue.get()
        os.environ('CUDA_VISIBLE_DEVICES') = gpu_id
        with tf.device('\gpu:'+ gpu_id):
            out = tf.estimator.train_and_evaluate(
                self.clf,
                train_spec = tf.estimator.TrainSpec(input_fn = lambda: self._inp_fn((x_train,y_train),mode=tf.estimator.ModeKeys.TRAIN)),
                eval_spec = tf.estimator.EvalSpec(input_fn = lambda: self._inp_fn((x_eval,y_eval),mode=tf.estimator.ModeKeys.EVAL))
                )
        print(f"Validation accuracy: {out[0]['accuracy']:.4f}")
        queue.put(gpu_id)
        return out[0]['accuracy']

код для вызова многопроцессорной обработки выглядит следующим образом:

#from multiprocessing import Pool, Manager
from pathos.multiprocessing import ProcessPool as Pool
from multiprocess import Manager
import dill

def train_pp(dnn, dict, man, pool):
    queue = man.Queue(dnn.NUM_GPUS)
    [queue.put(i) for i in range(dnn.NUM_GPUS)]
    all_acc = pool.map(dnn.train,[[(X_train,y_train),(X_eval,y_eval),queue,dict] for args])
    #queue.join()
    pool.close()
    pool.join()
    pool.clear()
    pool.restart()
    return all_acc, dict

if __name__ == "__main__":
    dnn = DNN()   
    pool = Pool(processes=dnn.NUM_GPUS)
    man = Manager()
    dict = man.dict()
    net_acc, dict = train_pp(dnn,dict,man,pool)

Каков наилучший способ одновременного обучения нескольких сетей?В чем проблема с многопроцессорной обработкой, которая показывает случайное поведение и как ее можно решить?

PS: дополнительный, не по теме вопрос: как должен осуществляться обмен данными при обучении всех сетей?по тем же данным?В настоящее время я считаю, что многопроцессорность создает несколько копий кода DNN, поэтому у каждого графического процессора есть свой итератор.Это лучший способ?Как получить данные о процессоре и обмениваться ими со всеми процессами без блокировок?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...