Как использовать tf.keras Sequential с tf.distribute.ParameterServerStrategy и tf.train.MonitoredSession? - PullRequest
0 голосов
/ 17 апреля 2019

Я пытаюсь настроить действительно простой минимальный рабочий пример для следующего: Использовать модель, созданную с помощью tf.keras, в tf.train.MonitoredSession, используя tf.Server с tf.distribute.ParameterServerStrategy.

В итоге моя цель - использовать модель tf.keras в распределенной среде, используя двух рабочих, каждый из которых имеет один графический процессор и сервер параметров.

Модель построена в соответствии с примером и найденной документацией.здесь: https://www.tensorflow.org/versions/r1.12/api_docs/python/tf/keras/models/Sequential

Стратегия сервера параметров используется в соответствии с документацией, найденной здесь: https://www.tensorflow.org/versions/r1.12/api_docs/python/tf/contrib/distribute/ParameterServerStrategy

Общая настройка, включая размещение устройства и использование MonitoredSession, взята из:https://github.com/tensorflow/examples/blob/master/community/en/docs/deploy/distributed.md

Я уже использую опцию allow_soft_placement и эмулирую распределенную настройку на моей локальной машине, имеющей только один процессор, поскольку в реальной распределенной настройке существуют разные проблемы, которыеЯ пытаюсь решить с помощью MonitoredSession, где инициализация переменной обрабатывается автоматически.

Этот код работает с "нормальной" (не отслеживаемой) tf.Сеанс и инициализация переменных - глобальные, локальные, переменные модели, таблицы и т. Д.

Строка, которая размораживает график, должна бытьвозможность использовать tf.data.Dataset в функции подгонки tf.keras.Model, так как должен быть создан итератор, что вызывает ошибку в замороженном графике.

Этот код я пытаюсьзапустить.Я использую tenorflow 1.12.0 и python 3.6.7.Я также пробовал Python 2.7, тот же результат.

Код не требует настройки, кроме установки tenorflow.

import sys
import tensorflow as tf

def main(argv):

  # Create local cluster config for run_local_server.sh script.
  cluster = tf.train.ClusterSpec({"worker": ["localhost:2222"], "ps": ["localhost:2223"]})
  task = 0
  job = str(argv[0])

  # Number of GPUs per worker
  GPU_PER_WORKER = 0

  config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
  server = tf.train.Server(cluster, job_name=job, task_index=task,config=config)

  strategy = tf.contrib.distribute.ParameterServerStrategy(num_gpus_per_worker=GPU_PER_WORKER)
  strategy.configure(session_config=config, cluster_spec=cluster,task_type=job,task_id=task)

  with strategy.scope():
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

    x_train = x_train.astype('float32') / 255
    x_test = x_test.astype('float32') / 255

    # Reshape input data from (28, 28) to (28, 28, 1)
    w, h = 28, 28
    x_train = x_train.reshape(x_train.shape[0], w, h, 1)
    x_test = x_test.reshape(x_test.shape[0], w, h, 1)

    # One-hot encode the labels
    y_train = tf.keras.utils.to_categorical(y_train, 10)
    y_test = tf.keras.utils.to_categorical(y_test, 10)

    train_ds = tf.data.Dataset.zip((tf.data.Dataset.from_tensor_slices(x_train),tf.data.Dataset.from_tensor_slices(y_train))).repeat().shuffle(60000).batch(10)
    val_ds = tf.data.Dataset.zip((tf.data.Dataset.from_tensor_slices(x_test),tf.data.Dataset.from_tensor_slices(y_test))).repeat().shuffle(10000).batch(10)

    with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task,cluster=cluster)):
      model = tf.keras.models.Sequential()

      conv0 = tf.keras.layers.Conv2D(filters=16, data_format='channels_last', padding="valid", kernel_size=4, strides=1, input_shape=(28,28,1), activation=tf.keras.activations.relu)
      model.add(conv0)

      flatten = tf.keras.layers.Flatten()
      model.add(flatten)

      dense1 = tf.keras.layers.Dense(10, activation=tf.keras.activations.softmax)
      model.add(dense1)

      model.compile(tf.contrib.optimizer_v2.AdamOptimizer(0.001), loss=tf.keras.metrics.mean_absolute_error,metrics=['accuracy'],distribute=strategy)   

    if job == "ps":
      server.join()
    elif job == "worker":
      with tf.train.MonitoredSession(session_creator=tf.train.ChiefSessionCreator(master=server.target,config=config)) as sess:        
        sess.graph._unsafe_unfinalize()
        history = model.fit(x=train_ds, validation_data=val_ds, validation_steps=1000, steps_per_epoch=100, epochs=60)

if __name__ == "__main__":
  main(sys.argv[1:])

Код не требует расширенной настройки, так как набор данных загружается из Интернета ипреобразован в tf.data.Dataset, так как я хочу организовать свой конвейер с реальными данными.Пример настройки данных MNIST взят из https://www.kaggle.com/margaretmz/mnist-with-tf-keras.

Я ожидаю, что код не потерпит неудачу из-за неправильной переменной или размещения операции, так как я в основном оставляю все эти решения для реализации тензорного потока с помощью strategy.scope()и tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task,cluster=cluster))

...