Я пытаюсь настроить действительно простой минимальный рабочий пример для следующего: Использовать модель, созданную с помощью tf.keras, в tf.train.MonitoredSession, используя tf.Server с tf.distribute.ParameterServerStrategy.
В итоге моя цель - использовать модель tf.keras в распределенной среде, используя двух рабочих, каждый из которых имеет один графический процессор и сервер параметров.
Модель построена в соответствии с примером и найденной документацией.здесь: https://www.tensorflow.org/versions/r1.12/api_docs/python/tf/keras/models/Sequential
Стратегия сервера параметров используется в соответствии с документацией, найденной здесь: https://www.tensorflow.org/versions/r1.12/api_docs/python/tf/contrib/distribute/ParameterServerStrategy
Общая настройка, включая размещение устройства и использование MonitoredSession, взята из:https://github.com/tensorflow/examples/blob/master/community/en/docs/deploy/distributed.md
Я уже использую опцию allow_soft_placement и эмулирую распределенную настройку на моей локальной машине, имеющей только один процессор, поскольку в реальной распределенной настройке существуют разные проблемы, которыеЯ пытаюсь решить с помощью MonitoredSession, где инициализация переменной обрабатывается автоматически.
Этот код работает с "нормальной" (не отслеживаемой) tf.Сеанс и инициализация переменных - глобальные, локальные, переменные модели, таблицы и т. Д.
Строка, которая размораживает график, должна бытьвозможность использовать tf.data.Dataset в функции подгонки tf.keras.Model, так как должен быть создан итератор, что вызывает ошибку в замороженном графике.
Этот код я пытаюсьзапустить.Я использую tenorflow 1.12.0 и python 3.6.7.Я также пробовал Python 2.7, тот же результат.
Код не требует настройки, кроме установки tenorflow.
import sys
import tensorflow as tf
def main(argv):
# Create local cluster config for run_local_server.sh script.
cluster = tf.train.ClusterSpec({"worker": ["localhost:2222"], "ps": ["localhost:2223"]})
task = 0
job = str(argv[0])
# Number of GPUs per worker
GPU_PER_WORKER = 0
config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
server = tf.train.Server(cluster, job_name=job, task_index=task,config=config)
strategy = tf.contrib.distribute.ParameterServerStrategy(num_gpus_per_worker=GPU_PER_WORKER)
strategy.configure(session_config=config, cluster_spec=cluster,task_type=job,task_id=task)
with strategy.scope():
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train.astype('float32') / 255
x_test = x_test.astype('float32') / 255
# Reshape input data from (28, 28) to (28, 28, 1)
w, h = 28, 28
x_train = x_train.reshape(x_train.shape[0], w, h, 1)
x_test = x_test.reshape(x_test.shape[0], w, h, 1)
# One-hot encode the labels
y_train = tf.keras.utils.to_categorical(y_train, 10)
y_test = tf.keras.utils.to_categorical(y_test, 10)
train_ds = tf.data.Dataset.zip((tf.data.Dataset.from_tensor_slices(x_train),tf.data.Dataset.from_tensor_slices(y_train))).repeat().shuffle(60000).batch(10)
val_ds = tf.data.Dataset.zip((tf.data.Dataset.from_tensor_slices(x_test),tf.data.Dataset.from_tensor_slices(y_test))).repeat().shuffle(10000).batch(10)
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task,cluster=cluster)):
model = tf.keras.models.Sequential()
conv0 = tf.keras.layers.Conv2D(filters=16, data_format='channels_last', padding="valid", kernel_size=4, strides=1, input_shape=(28,28,1), activation=tf.keras.activations.relu)
model.add(conv0)
flatten = tf.keras.layers.Flatten()
model.add(flatten)
dense1 = tf.keras.layers.Dense(10, activation=tf.keras.activations.softmax)
model.add(dense1)
model.compile(tf.contrib.optimizer_v2.AdamOptimizer(0.001), loss=tf.keras.metrics.mean_absolute_error,metrics=['accuracy'],distribute=strategy)
if job == "ps":
server.join()
elif job == "worker":
with tf.train.MonitoredSession(session_creator=tf.train.ChiefSessionCreator(master=server.target,config=config)) as sess:
sess.graph._unsafe_unfinalize()
history = model.fit(x=train_ds, validation_data=val_ds, validation_steps=1000, steps_per_epoch=100, epochs=60)
if __name__ == "__main__":
main(sys.argv[1:])
Код не требует расширенной настройки, так как набор данных загружается из Интернета ипреобразован в tf.data.Dataset, так как я хочу организовать свой конвейер с реальными данными.Пример настройки данных MNIST взят из https://www.kaggle.com/margaretmz/mnist-with-tf-keras.
Я ожидаю, что код не потерпит неудачу из-за неправильной переменной или размещения операции, так как я в основном оставляю все эти решения для реализации тензорного потока с помощью strategy.scope()
и tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task,cluster=cluster))