Распределенное обучение в тензорном потоке с оценщиком - PullRequest
0 голосов
/ 28 мая 2019

Я читаю страницы, относящиеся к распределенному обучению, в учебниках по Tensorflow и, в частности, смотрю на страницу Распределенный тензорный поток , а также пример, приведенный в Обучение для нескольких работников с использованием стратегий распределения , но объяснения несколько недостаточны или сбивают с толку. Мне интересно запустить простой, но полный пример распределенного обучения с использованием процессоров на разных машинах, и я думаю, что использование оценок, описанных в этих ссылках, - лучший способ сделать это. Из моих чтений я понимаю, что я должен определить ps_host и некоторые worker_hosts и определить их, как описано в первой ссылке. Мой сценарий будет похожим, но я должен запускать его с разными аргументами командной строки на каждой машине. Я думаю, что могу использовать весь код, показанный в разделе «Собери все вместе», за исключением раздела elif FLAGS.job_name == "worker": после строки with tf.device(..., потому что он не использует оценщиков и нет кода для чтения каких-либо входных данных. Чтобы заменить его, я посмотрел на скрипт keras_model_to_estimator.py во второй ссылке и вместо этого поместил код функции main() (изменив MirroredStrategy на MultiWorkerMirroredStrategy, потому что у меня нет графического процессора и я хочу распределить его в некоторых процессорах ). Также я должен добавить функцию input_fn() в мой скрипт.

Мой первый вопрос: правильный ли это подход или я что-то упустил или неправильно понял?

И второе: когда я запускаю код ниже, он говорит, что module 'tensorflow.contrib.distribute' has no attribute 'MultiWorkerMirroredStrategy'. Как я могу использовать эту стратегию тогда? У меня нет GPU и я запускаю Tensorflow 1.13.

Мой код указан ниже:

import argparse
import sys

import tensorflow as tf
import numpy as np

FLAGS = None

def input_fn():
  x = np.random.random((1024, 10))
  y = np.random.randint(2, size=(1024, 1))
  x = tf.cast(x, tf.float32)
  dataset = tf.data.Dataset.from_tensor_slices((x, y))
  dataset = dataset.repeat(100)
  dataset = dataset.batch(32)
  return dataset

def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")

  # Create a cluster from the parameter server and worker hosts.
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

  # Create and start a server for the local task.
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

        model_dir = 'C:/Temp'
        print('Using %s to store checkpoints.' % model_dir)

        # Define a Keras Model.
        model = tf.keras.Sequential()
        model.add(tf.keras.layers.Dense(16, activation='relu', input_shape=(10,)))
        model.add(tf.keras.layers.Dense(1, activation='sigmoid'))

        # Compile the model.
        optimizer = tf.train.GradientDescentOptimizer(0.2)
        model.compile(loss='binary_crossentropy', optimizer=optimizer)
        model.summary()
        tf.keras.backend.set_learning_phase(True)

        # Define DistributionStrategies and convert the Keras Model to an
        # Estimator that utilizes these DistributionStrateges.
        # Evaluator is a single worker, so using MirroredStrategy.
        config = tf.estimator.RunConfig(
            experimental_distribute=tf.contrib.distribute.DistributeConfig(
                train_distribute=tf.contrib.distribute.CollectiveAllReduceStrategy(
                    num_gpus_per_worker=2),
                eval_distribute=tf.contrib.distribute.MultiWorkerMirroredStrategy(
                    num_gpus_per_worker=2)))
        keras_estimator = tf.keras.estimator.model_to_estimator(
            keras_model=model, config=config, model_dir=model_dir)

        # Train and evaluate the model. Evaluation will be skipped if there is not an
        # "evaluator" job in the cluster.
        tf.estimator.train_and_evaluate(
            keras_estimator,
            train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
            eval_spec=tf.estimator.EvalSpec(input_fn=input_fn))


if __name__ == "__main__":
  parser = argparse.ArgumentParser()
  parser.register("type", "bool", lambda v: v.lower() == "true")
  # Flags for defining the tf.train.ClusterSpec
  parser.add_argument(
      "--ps_hosts",
      type=str,
      default="",
      help="Comma-separated list of hostname:port pairs"
  )
  parser.add_argument(
      "--worker_hosts",
      type=str,
      default="",
      help="Comma-separated list of hostname:port pairs"
  )
  parser.add_argument(
      "--job_name",
      type=str,
      default="",
      help="One of 'ps', 'worker'"
  )
  # Flags for defining the tf.train.Server
  parser.add_argument(
      "--task_index",
      type=int,
      default=0,
      help="Index of task within the job"
  )
  FLAGS, unparsed = parser.parse_known_args()
  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
...