Тайм-аут алгоритма Sagemaker - PullRequest
0 голосов
/ 09 апреля 2019

Я создал собственный алгоритм Sagemaker, основанный на этом примере: https://github.com/awslabs/amazon-sagemaker-examples/tree/master/advanced_functionality/tensorflow_bring_your_own

Я создал образ докера и отправил его в ECR, а также успешно создал алгоритм, но его проверка прошла успешноистекло время ожидания (1800 секунд для проверки алгоритма).Я запускал алгоритм локально, с реплицированной средой проверки в 1 шаг и размером набора данных 128 изображений (1 пакет).

Я не получаю ни ошибок, ни каких-либо событий регистрации в cloudwatch,но я вижу, что тренировка на Sagemaker занимает 30 минут, что является максимальным временем для проверки.Почему мой алгоритм обучения останавливается при обучении на Sagemaker, но когда я запускаю его локально, он завершается мгновенно, успешно?

Я использую идентичные сценарии «train» и «serve» из связанного примера, единственное отличие состоит в том, чтоскрипт моего алгоритма.

Вот код моего алгоритма:

import argparse
import functools
import os
import random
import numpy as np
import tensorflow as tf
from tensorflow.contrib.slim.nets import resnet_v1
slim = tf.contrib.slim

SIGNATURE_NAME = "serving_default"

WIDTH = 100
HEIGHT = 140
DEPTH = 3

def decode_image(image_file_names):    
    images = []    
    graph = tf.Graph()
    with graph.as_default():
        file_name = tf.placeholder(dtype=tf.string)
        file = tf.read_file(file_name)
        image = tf.image.decode_jpeg(file,3)

    with tf.Session(graph=graph) as session:
        tf.global_variables_initializer().run()   
        for i in range(len(image_file_names)):
            images.append(session.run(image, feed_dict={file_name: image_file_names[i]}))
            if (i+1) % 1000 == 0:
                print('Images processed: ',i+1)        
        session.close()

    return images

def load_images(data_dir):
    train_image_file_names = [data_dir+i for i in os.listdir(data_dir)]
    random.shuffle(train_image_file_names)        
    images = decode_image(train_image_file_names)
    return np.asarray(images).astype('float32')

def encoder(inputs, hidden_units, dropout, is_training):
    net = tf.to_float(inputs)
    for num_hidden_units in hidden_units:
        net = tf.contrib.layers.fully_connected(
            net, num_outputs=num_hidden_units, activation_fn=tf.nn.relu)
        if dropout is not None:
            net = slim.dropout(net, is_training=is_training)
    return net

def decoder(inputs, hidden_units, dropout, is_training):
    net = inputs
    for num_hidden_units in hidden_units[:-1]:
        net = tf.contrib.layers.fully_connected(
            net, num_outputs=num_hidden_units)
        if dropout is not None:
            net = slim.dropout(net, is_training=is_training)

    net = tf.contrib.layers.fully_connected(net, hidden_units[-1],
                                            activation_fn=tf.nn.relu)
    return net

def autoencoder(inputs, hidden_units, activation_fn, dropout, weight_decay, mode):
    is_training = mode == tf.estimator.ModeKeys.TRAIN

    weights_init = slim.initializers.variance_scaling_initializer()
    if weight_decay is None:
        weights_regularizer = None
    else:
        weights_reg = tf.contrib.layers.l2_regularizer(weight_decay)

    with slim.arg_scope([tf.contrib.layers.fully_connected],
                        weights_initializer=weights_init,
                        weights_regularizer=weights_reg,
                        activation_fn=activation_fn):
        encoded = encoder(inputs, hidden_units, dropout, is_training)
        n_features = inputs.shape[1].value
        decoder_units = hidden_units[:-1][::-1] + [n_features]
        net = decoder(encoded, decoder_units, dropout, is_training)
    return net, encoded


def model_fn(features, labels, mode, params):
    if isinstance(features, dict):
        features = features['feature']

    is_training = mode == tf.estimator.ModeKeys.TRAIN

    logits, encoded = autoencoder(inputs=features,
                     hidden_units=params['hidden_units'],
                     activation_fn=tf.nn.relu,
                     dropout=None,
                     weight_decay=1e-5,
                     mode=mode)

    predictions = {"encoding": encoded}

    if mode == tf.estimator.ModeKeys.PREDICT:
        export_outputs = {
            SIGNATURE_NAME: tf.estimator.export.PredictOutput(predictions)
        }
        return tf.estimator.EstimatorSpec(
            mode=mode,
            predictions=predictions,
            export_outputs=export_outputs)

    tf.losses.mean_squared_error(labels, logits)
    total_loss = tf.losses.get_total_loss(add_regularization_losses=is_training)

    train_op = tf.contrib.layers.optimize_loss(
        loss=total_loss,
        optimizer="Adam",
        learning_rate=.001,
        learning_rate_decay_fn=lambda lr, gs: tf.train.exponential_decay(lr, gs, 1000, 0.96, staircase=True),
        global_step=tf.train.get_global_step())

    return tf.estimator.EstimatorSpec(
        mode=mode,
        predictions=predictions,
        loss=total_loss,
        train_op=train_op)

def serving_input_fn():
    """
    Serving input function for CIFAR-10. Specifies the input format the caller of predict() will have to provide.
    For more information: https://www.tensorflow.org/guide/saved_model#build_and_load_a_savedmodel
    """
    inputs = {'feature': tf.placeholder(tf.float32, [None, WIDTH * HEIGHT * DEPTH])}
    return tf.estimator.export.ServingInputReceiver(inputs, inputs)


def train(model_dir, data_dir, train_steps, images, parameters,batch_size):    

    estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=model_dir, params=parameters)

    my_input_fn = tf.estimator.inputs.numpy_input_fn(
            x = images,
            y = images,
            shuffle=False,
            num_epochs=train_steps,
            batch_size=batch_size)
    eval_input_fn = tf.estimator.inputs.numpy_input_fn(
            x = images,
            y = images,
            shuffle=True,
            batch_size=1)

    train_spec = tf.estimator.TrainSpec(my_input_fn)    
    exporter = tf.estimator.LatestExporter('Servo', serving_input_receiver_fn=serving_input_fn)
    eval_spec = tf.estimator.EvalSpec(eval_input_fn, steps=1, exporters=exporter)

    try:
        os.makedirs(model_dir + '/export/Servo/')
    except:
        print("except")

    tf.estimator.train_and_evaluate(estimator=estimator, train_spec=train_spec, eval_spec=eval_spec)

def str_to_intarr(arr):
    strarr = arr.split(',')
    arr = []
    for str in strarr:
        arr.append(int(str))
    return arr

def main(model_dir, data_dir, train_steps, network_dim, image_dim, batch_size):
    tf.logging.set_verbosity(tf.logging.INFO)

    image_dim_arr = str_to_intarr(image_dim)

    parameters = {'hidden_units': str_to_intarr(network_dim), 'image_dim': image_dim_arr, 'image_size': image_dim_arr[0]* image_dim_arr[1]* image_dim_arr[2]}

    WIDTH = image_dim_arr[0]
    HEIGHT = image_dim_arr[1]
    DEPTH = image_dim_arr[2]

    images = load_images(data_dir)
    images = images.reshape([len(images),parameters['image_size']])
    train(model_dir, data_dir, train_steps, images, parameters,batch_size)

if __name__ == '__main__':
    args_parser = argparse.ArgumentParser()    
    args_parser.add_argument(
        '--data-dir',
        default='/opt/ml/input/data/train/',
        type=str,
        help='The directory where the CIFAR-10 input data is stored. Default: /opt/ml/input/data/training. This '
             'directory corresponds to the SageMaker channel named \'training\', which was specified when creating '
             'our training job on SageMaker')
    args_parser.add_argument(
        '--model-dir',
        default='/opt/ml/model/',
        type=str,
        help='The directory where the model will be stored. Default: /opt/ml/model. This directory should contain all '
             'final model artifacts as Amazon SageMaker copies all data within this directory as a single object in '
             'compressed tar format.')
    args_parser.add_argument(
        '--train-steps',
        type=int,
        default=100,
        help='The number of steps to use for training.')
    args_parser.add_argument(
        '--network-dim',
        type=str,
        default='1024,512,128',
        help='Comma separated list of ints, used for the encoder dimensions. The decoder dimensions are the reverse.')
    args_parser.add_argument(
        '--image-dim',
        type=str,
        default='100,140,3',
        help='Comma separated list of ints, used for image dimensions. Format is "width,height,channels"')
    args_parser.add_argument(
        '--batch-size',
        type=int,
        default=128,
        help='Batch size for training.')
    args = args_parser.parse_args()
    main(**vars(args))
...