Распределенный тензорный поток: чем занимается главный рабочий? - PullRequest
0 голосов
/ 25 апреля 2018

Я использую версию примера распределенного тензорного потока https://www.tensorflow.org/deploy/distributed Вот мой код в "mnist_trainer.py".

import math
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
tf.logging.set_verbosity(tf.logging.INFO)

# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")

# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("hidden_units", 100,
                            "Number of units in the hidden layer of the NN")
tf.app.flags.DEFINE_string("data_dir", "/home/anijsure/mnist_data",
                           "Directory for storing mnist data")
tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")

FLAGS = tf.app.flags.FLAGS

IMAGE_PIXELS = 28

def main(_):
  print "Starting"
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")

  # Create a cluster from the parameter server and worker hosts.
  print "Cluster starting"
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

  # Create and start a server for the local task.
  print "Server starting"
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":
    print "Job : WORKER"

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):
      mytask = tf.constant(FLAGS.task_index, name="mytask")

      mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
      dataset = tf.data.Dataset.from_tensor_slices((mnist.train.images, mnist.train.labels))
      # Create batches of data
      dataset = dataset.batch(FLAGS.batch_size)
      # Create an iterator, to go over the dataset
      iterator = dataset.make_initializable_iterator()
      X,Y = iterator.get_next()

      # Variables of the hidden layer
      hid_w = tf.Variable(
          tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
                              stddev=1.0 / IMAGE_PIXELS), name="hid_w")
      hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")

      # Variables of the softmax layer
      sm_w = tf.Variable(
          tf.truncated_normal([FLAGS.hidden_units, 10],
                              stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
          name="sm_w")
      sm_b = tf.Variable(tf.zeros([10]), name="sm_b")

      hid_lin = tf.nn.xw_plus_b(X, hid_w, hid_b)
      hid = tf.nn.relu(hid_lin)

      y = tf.nn.xw_plus_b(hid, sm_w, sm_b)
      loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=Y, logits=y), name="loss")

      global_step = tf.train.get_or_create_global_step()

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)


    # The StopAtStepHook handles stopping after running given steps.
    chiefhooks=[tf.train.StopAtStepHook(num_steps=25)]
    allhooks=[tf.train.LoggingTensorHook(tensors={"Task": "mytask","loss":"loss", "Step":"global_step"}, every_n_iter=1)]

    # The MonitoredTrainingSession takes care of session initialization,
    # restoring from a checkpoint, saving to a checkpoint, and closing when done
    # or an error occurs.
    with tf.train.MonitoredTrainingSession(master=server.target,
                                           is_chief=(FLAGS.task_index == 0),
                                           checkpoint_dir="/tmp/train_logs_%d" % FLAGS.task_index,
                                           hooks=allhooks, chief_only_hooks=chiefhooks) as mon_sess:
      mon_sess.run(iterator.initializer)
      while not mon_sess.should_stop():
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.
        # mon_sess.run handles AbortedError in case of preempted PS.

        _ = mon_sess.run([train_op])

if __name__ == "__main__":
  tf.app.run()

Я запускаю его так:

HOSTS=<node0>:2222
WORKERS=<node1>:2222,<node1>:2223,<node1>:2224

python mnist_trainer.py --ps_hosts=$HOSTS --worker_hosts=$WORKERS --job_name=ps --task_index=0 &
python mnist_trainer.py --data_dir mnist_data --ps_hosts=$HOSTS --worker_hosts=$WORKERS --job_name=worker --task_index=0 2>&1 | tee worker0.log &
python mnist_trainer.py --data_dir mnist_data_1 --ps_hosts=$HOSTS --worker_hosts=$WORKERS --job_name=worker --task_index=1 2>&1 | tee worker1.log &
python mnist_trainer.py --data_dir mnist_data_2 --ps_hosts=$HOSTS --worker_hosts=$WORKERS --job_name=worker --task_index=2 2>&1 | tee worker2.log &

Я пробовал это с 1 PS и 2 или 3 работниками - оба узла являются процессорами.PS находится на узле 0, а рабочие все разные порты на узле 1.В случае с 2 или 3 работниками главный работник (работник задачи 0), по-видимому, вообще не делает никаких обновлений.Я установил StopatStepHook на 25 только для главного работника.Однако обучение останавливается на global_step = 549 для случая с двумя работниками и global_step = 1098 для случая с тремя работниками.Я печатаю рабочую задачу № с помощью LoggingTensorHook, и она показывает только запись задач 1 и 2, что-либо регистрирует.Только на последней итерации задача 0 регистрирует тензоры.

Это ожидаемое поведение?Должен ли главный работник отслеживать только сеанс мониторинга, контрольные точки и т. Д.?

Учитывая, что обучение останавливается на этом магическом числе 550 итеров, что-то на главном работнике действительно вызывает остановку.

Что делает главный работник и как он отслеживает шаг остановки?

Ответы [ 2 ]

0 голосов
/ 19 марта 2019

В соответствии с документацией TensorFlow для tf.estimator.train_and_evaluate:

… [T] главный работник также выполняет типовую учебную работу, аналогично другим работникам, не занимающимся обучением.(см. следующий абзац).В дополнение к обучению модели, он управляет некоторой дополнительной работой, например, сохранение и восстановление контрольной точки, написание резюме и т. Д.

0 голосов
/ 31 июля 2018

Обычно главный работник отвечает за initialize graph, save model checkpoint операций для учебного кластера.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...