Я реализовал репликацию между графами и асинхронное обучение в соответствии с примером 'https://www.tensorflow.org/deploy/distributed’.
Затем я установил два сервера и одного работника следующим образом.
pythondnn.py --ps_hosts = localhost: 19000, localhost: 18000 --worker_hosts = localhost: 11000 --job_name = ps --task_index = 0
python dnn.py --ps_hosts = localhost: 19000, localhost: 18000 --worker_hosts = localhost: 11000 --job_name = ps --task_index = 1
python dnn.py --ps_hosts = localhost: 19000, localhost: 18000 --worker_hosts = localhost: 11000 --job_name= worker --task_index = 0
У меня есть три вопроса для распределенного тензорного потока.
Во-первых, согласно временной шкале тензорного потока моей программы, как показано ниже, все вычисления и обновление переменныхоперации выполняются на ps узлах , пока рабочий узел не используется.Это сбивает меня с толку, так как я думал, что вычислительная операция должна выполняться на рабочих узлах, а не на узлах ps.Кто-нибудь поможет мне в этом?
распределенная временная шкала тензорного потока
Во-вторых, только ЦП назначается серверам с помощью tf.train.replica_device_setter inмоя программаОднако операции выполняются как на CPU, так и на GPU.Как правильно назначить процессоры / графические процессоры серверам?
И последнее, но не менее важное: если я запустил два сервера и три рабочих, сохранят ли два сервера одну и ту же копию параметров?Также мне интересно, обновят ли три рабочих градиенты одного и того же графика.Кто-нибудь мне скажет?
ps Я назначил устройства как серверам, так и работникам, используя tf.train.replica_device_setter.Однако в примере (https://www.tensorflow.org/deploy/distributed), нет назначения устройств локальному серверу. В моем случае, если я не назначил устройства локальному серверу, будут ошибки, такие как:
«Операциябыл явно назначен на / job: ps / task: 0, но доступны следующие устройства [/ job: localhost / replica: 0 / task: 0 / device: CPU: 0, / job: localhost / replica: 0 / task: 0 / device: GPU: 0, / job: localhost / replica: 0 / task: 0 / device: GPU: 1 ...]. Убедитесь, что в спецификации устройства указано допустимое устройство. ”
Мой код:
def train():
tl = TimeLiner()
#get current servers
ps_hosts = FLAGS.ps_hosts.split(",")
#get current workers
worker_hosts = FLAGS.worker_hosts.split(",")
cluster = tf.train.ClusterSpec({"ps": ps_hosts,
"worker": worker_hosts})
graph_options = tf.GraphOptions(enable_bfloat16_sendrecv=True)
gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.3, allow_growth=True)
config = tf.ConfigProto(graph_options=graph_options, gpu_options=gpu_options, log_device_placement=False,
allow_soft_placement=False)
#start a server
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index,
config=config)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
with tf.variable_scope(tf.get_variable_scope()):
with tf.device(tf.train.replica_device_setter(ps_device="/job:localhost/replica:0/task:%d/device:CPU:0" % FLAGS.task_index,
worker_device="/job:localhost/replica:0/task:%d/device:GPU:0" % FLAGS.task_index,
cluster=cluster)):
loss = ...
global_step = tf.train.get_or_create_global_step()
train_op = tf.train.AdagradOptimizer(0.01).minimize(loss, global_step=global_step)
sys.stdout.flush()
init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())
summary_op = tf.summary.merge_all()
hooks = [tf.train.StopAtStepHook(last_step=FLAGS.max_steps)]
total_training = 0
graph_options = tf.GraphOptions(enable_bfloat16_sendrecv=True)
gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.9, allow_growth=True)
config = tf.ConfigProto(graph_options=graph_options, gpu_options=gpu_options, log_device_placement=False,
allow_soft_placement=True)
with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(FLAGS.task_index == 0),
checkpoint_dir=FLAGS.log_dir,
log_step_count_steps=100000,
hooks=hooks,
config=config) as mon_sess:
mon_sess.run(init_op)
options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
run_metadata = tf.RunMetadata()
while not mon_sess.should_stop():
# run a training step asynchronously
[_, tot_loss, step, summary] = mon_sess.run([train_op, loss, global_step, summary_op],
options=options,
run_metadata=run_metadata)
fetched_timeline = timeline.Timeline(run_metadata.step_stats)
chrome_trace = fetched_timeline.generate_chrome_trace_format()
tl.update_timeline(chrome_trace)
tl.save('timeline.json')
Заранее спасибо!
Ин