Я создал собственный алгоритм Sagemaker, основанный на этом примере: https://github.com/awslabs/amazon-sagemaker-examples/tree/master/advanced_functionality/tensorflow_bring_your_own
Я создал образ докера и отправил его в ECR, а также успешно создал алгоритм, но его проверка прошла успешноистекло время ожидания (1800 секунд для проверки алгоритма).Я запускал алгоритм локально, с реплицированной средой проверки в 1 шаг и размером набора данных 128 изображений (1 пакет).
Я не получаю ни ошибок, ни каких-либо событий регистрации в cloudwatch,но я вижу, что тренировка на Sagemaker занимает 30 минут, что является максимальным временем для проверки.Почему мой алгоритм обучения останавливается при обучении на Sagemaker, но когда я запускаю его локально, он завершается мгновенно, успешно?
Я использую идентичные сценарии «train» и «serve» из связанного примера, единственное отличие состоит в том, чтоскрипт моего алгоритма.
Вот код моего алгоритма:
import argparse
import functools
import os
import random
import numpy as np
import tensorflow as tf
from tensorflow.contrib.slim.nets import resnet_v1
slim = tf.contrib.slim
SIGNATURE_NAME = "serving_default"
WIDTH = 100
HEIGHT = 140
DEPTH = 3
def decode_image(image_file_names):
images = []
graph = tf.Graph()
with graph.as_default():
file_name = tf.placeholder(dtype=tf.string)
file = tf.read_file(file_name)
image = tf.image.decode_jpeg(file,3)
with tf.Session(graph=graph) as session:
tf.global_variables_initializer().run()
for i in range(len(image_file_names)):
images.append(session.run(image, feed_dict={file_name: image_file_names[i]}))
if (i+1) % 1000 == 0:
print('Images processed: ',i+1)
session.close()
return images
def load_images(data_dir):
train_image_file_names = [data_dir+i for i in os.listdir(data_dir)]
random.shuffle(train_image_file_names)
images = decode_image(train_image_file_names)
return np.asarray(images).astype('float32')
def encoder(inputs, hidden_units, dropout, is_training):
net = tf.to_float(inputs)
for num_hidden_units in hidden_units:
net = tf.contrib.layers.fully_connected(
net, num_outputs=num_hidden_units, activation_fn=tf.nn.relu)
if dropout is not None:
net = slim.dropout(net, is_training=is_training)
return net
def decoder(inputs, hidden_units, dropout, is_training):
net = inputs
for num_hidden_units in hidden_units[:-1]:
net = tf.contrib.layers.fully_connected(
net, num_outputs=num_hidden_units)
if dropout is not None:
net = slim.dropout(net, is_training=is_training)
net = tf.contrib.layers.fully_connected(net, hidden_units[-1],
activation_fn=tf.nn.relu)
return net
def autoencoder(inputs, hidden_units, activation_fn, dropout, weight_decay, mode):
is_training = mode == tf.estimator.ModeKeys.TRAIN
weights_init = slim.initializers.variance_scaling_initializer()
if weight_decay is None:
weights_regularizer = None
else:
weights_reg = tf.contrib.layers.l2_regularizer(weight_decay)
with slim.arg_scope([tf.contrib.layers.fully_connected],
weights_initializer=weights_init,
weights_regularizer=weights_reg,
activation_fn=activation_fn):
encoded = encoder(inputs, hidden_units, dropout, is_training)
n_features = inputs.shape[1].value
decoder_units = hidden_units[:-1][::-1] + [n_features]
net = decoder(encoded, decoder_units, dropout, is_training)
return net, encoded
def model_fn(features, labels, mode, params):
if isinstance(features, dict):
features = features['feature']
is_training = mode == tf.estimator.ModeKeys.TRAIN
logits, encoded = autoencoder(inputs=features,
hidden_units=params['hidden_units'],
activation_fn=tf.nn.relu,
dropout=None,
weight_decay=1e-5,
mode=mode)
predictions = {"encoding": encoded}
if mode == tf.estimator.ModeKeys.PREDICT:
export_outputs = {
SIGNATURE_NAME: tf.estimator.export.PredictOutput(predictions)
}
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
export_outputs=export_outputs)
tf.losses.mean_squared_error(labels, logits)
total_loss = tf.losses.get_total_loss(add_regularization_losses=is_training)
train_op = tf.contrib.layers.optimize_loss(
loss=total_loss,
optimizer="Adam",
learning_rate=.001,
learning_rate_decay_fn=lambda lr, gs: tf.train.exponential_decay(lr, gs, 1000, 0.96, staircase=True),
global_step=tf.train.get_global_step())
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
loss=total_loss,
train_op=train_op)
def serving_input_fn():
"""
Serving input function for CIFAR-10. Specifies the input format the caller of predict() will have to provide.
For more information: https://www.tensorflow.org/guide/saved_model#build_and_load_a_savedmodel
"""
inputs = {'feature': tf.placeholder(tf.float32, [None, WIDTH * HEIGHT * DEPTH])}
return tf.estimator.export.ServingInputReceiver(inputs, inputs)
def train(model_dir, data_dir, train_steps, images, parameters,batch_size):
estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=model_dir, params=parameters)
my_input_fn = tf.estimator.inputs.numpy_input_fn(
x = images,
y = images,
shuffle=False,
num_epochs=train_steps,
batch_size=batch_size)
eval_input_fn = tf.estimator.inputs.numpy_input_fn(
x = images,
y = images,
shuffle=True,
batch_size=1)
train_spec = tf.estimator.TrainSpec(my_input_fn)
exporter = tf.estimator.LatestExporter('Servo', serving_input_receiver_fn=serving_input_fn)
eval_spec = tf.estimator.EvalSpec(eval_input_fn, steps=1, exporters=exporter)
try:
os.makedirs(model_dir + '/export/Servo/')
except:
print("except")
tf.estimator.train_and_evaluate(estimator=estimator, train_spec=train_spec, eval_spec=eval_spec)
def str_to_intarr(arr):
strarr = arr.split(',')
arr = []
for str in strarr:
arr.append(int(str))
return arr
def main(model_dir, data_dir, train_steps, network_dim, image_dim, batch_size):
tf.logging.set_verbosity(tf.logging.INFO)
image_dim_arr = str_to_intarr(image_dim)
parameters = {'hidden_units': str_to_intarr(network_dim), 'image_dim': image_dim_arr, 'image_size': image_dim_arr[0]* image_dim_arr[1]* image_dim_arr[2]}
WIDTH = image_dim_arr[0]
HEIGHT = image_dim_arr[1]
DEPTH = image_dim_arr[2]
images = load_images(data_dir)
images = images.reshape([len(images),parameters['image_size']])
train(model_dir, data_dir, train_steps, images, parameters,batch_size)
if __name__ == '__main__':
args_parser = argparse.ArgumentParser()
args_parser.add_argument(
'--data-dir',
default='/opt/ml/input/data/train/',
type=str,
help='The directory where the CIFAR-10 input data is stored. Default: /opt/ml/input/data/training. This '
'directory corresponds to the SageMaker channel named \'training\', which was specified when creating '
'our training job on SageMaker')
args_parser.add_argument(
'--model-dir',
default='/opt/ml/model/',
type=str,
help='The directory where the model will be stored. Default: /opt/ml/model. This directory should contain all '
'final model artifacts as Amazon SageMaker copies all data within this directory as a single object in '
'compressed tar format.')
args_parser.add_argument(
'--train-steps',
type=int,
default=100,
help='The number of steps to use for training.')
args_parser.add_argument(
'--network-dim',
type=str,
default='1024,512,128',
help='Comma separated list of ints, used for the encoder dimensions. The decoder dimensions are the reverse.')
args_parser.add_argument(
'--image-dim',
type=str,
default='100,140,3',
help='Comma separated list of ints, used for image dimensions. Format is "width,height,channels"')
args_parser.add_argument(
'--batch-size',
type=int,
default=128,
help='Batch size for training.')
args = args_parser.parse_args()
main(**vars(args))