Я читаю страницы, относящиеся к распределенному обучению, в учебниках по Tensorflow и, в частности, смотрю на страницу Распределенный тензорный поток , а также пример, приведенный в Обучение для нескольких работников с использованием стратегий распределения , но объяснения несколько недостаточны или сбивают с толку. Мне интересно запустить простой, но полный пример распределенного обучения с использованием процессоров на разных машинах, и я думаю, что использование оценок, описанных в этих ссылках, - лучший способ сделать это.
Из моих чтений я понимаю, что я должен определить ps_host и некоторые worker_hosts и определить их, как описано в первой ссылке. Мой сценарий будет похожим, но я должен запускать его с разными аргументами командной строки на каждой машине. Я думаю, что могу использовать весь код, показанный в разделе «Собери все вместе», за исключением раздела elif FLAGS.job_name == "worker":
после строки with tf.device(...
, потому что он не использует оценщиков и нет кода для чтения каких-либо входных данных. Чтобы заменить его, я посмотрел на скрипт keras_model_to_estimator.py
во второй ссылке и вместо этого поместил код функции main()
(изменив MirroredStrategy на MultiWorkerMirroredStrategy, потому что у меня нет графического процессора и я хочу распределить его в некоторых процессорах ). Также я должен добавить функцию input_fn()
в мой скрипт.
Мой первый вопрос: правильный ли это подход или я что-то упустил или неправильно понял?
И второе: когда я запускаю код ниже, он говорит, что module 'tensorflow.contrib.distribute' has no attribute 'MultiWorkerMirroredStrategy'
. Как я могу использовать эту стратегию тогда? У меня нет GPU и я запускаю Tensorflow 1.13.
Мой код указан ниже:
import argparse
import sys
import tensorflow as tf
import numpy as np
FLAGS = None
def input_fn():
x = np.random.random((1024, 10))
y = np.random.randint(2, size=(1024, 1))
x = tf.cast(x, tf.float32)
dataset = tf.data.Dataset.from_tensor_slices((x, y))
dataset = dataset.repeat(100)
dataset = dataset.batch(32)
return dataset
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# Create and start a server for the local task.
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
model_dir = 'C:/Temp'
print('Using %s to store checkpoints.' % model_dir)
# Define a Keras Model.
model = tf.keras.Sequential()
model.add(tf.keras.layers.Dense(16, activation='relu', input_shape=(10,)))
model.add(tf.keras.layers.Dense(1, activation='sigmoid'))
# Compile the model.
optimizer = tf.train.GradientDescentOptimizer(0.2)
model.compile(loss='binary_crossentropy', optimizer=optimizer)
model.summary()
tf.keras.backend.set_learning_phase(True)
# Define DistributionStrategies and convert the Keras Model to an
# Estimator that utilizes these DistributionStrateges.
# Evaluator is a single worker, so using MirroredStrategy.
config = tf.estimator.RunConfig(
experimental_distribute=tf.contrib.distribute.DistributeConfig(
train_distribute=tf.contrib.distribute.CollectiveAllReduceStrategy(
num_gpus_per_worker=2),
eval_distribute=tf.contrib.distribute.MultiWorkerMirroredStrategy(
num_gpus_per_worker=2)))
keras_estimator = tf.keras.estimator.model_to_estimator(
keras_model=model, config=config, model_dir=model_dir)
# Train and evaluate the model. Evaluation will be skipped if there is not an
# "evaluator" job in the cluster.
tf.estimator.train_and_evaluate(
keras_estimator,
train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
eval_spec=tf.estimator.EvalSpec(input_fn=input_fn))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.register("type", "bool", lambda v: v.lower() == "true")
# Flags for defining the tf.train.ClusterSpec
parser.add_argument(
"--ps_hosts",
type=str,
default="",
help="Comma-separated list of hostname:port pairs"
)
parser.add_argument(
"--worker_hosts",
type=str,
default="",
help="Comma-separated list of hostname:port pairs"
)
parser.add_argument(
"--job_name",
type=str,
default="",
help="One of 'ps', 'worker'"
)
# Flags for defining the tf.train.Server
parser.add_argument(
"--task_index",
type=int,
default=0,
help="Index of task within the job"
)
FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)