collective_ops.all_reduce
в tensorflow.python.ops
очень полезен для сбора градиентов в распределенной среде. Я пробую фрагмент кода, и он отлично работает. Когда этот метод встроен в проект, он работает, но вывод немного отличается от исходного фрагмента кода, который равен 0.70491827
по сравнению с 0.705088
. Кроме того, worker0 продолжает печатать дополнительную информацию о повторной инициализации , даже если worker1 был установлен как collective_group_leader
. Интересно, как возникает эта проблема, и есть ли подсказка, чтобы ее исправить. Код запускается в тензорном потоке 1.13 с питоном 3.6.8.
образец без проблем:
"""Illustrate all reduce using collective_ops
"""
MP_METHOD = 'fork' # 'fork' (UNIX), 'spawn' (WINDOWS);
NUM_PROCESSES = 2
def process_fn(worker_hosts, task_index):
"""allreduce process"""
import os
import time
import tensorflow as tf
from tensorflow.python.ops import collective_ops
os.environ['CUDA_VISIBLE_DEVICES'] = str(task_index)
cluster_spec = tf.train.ClusterSpec({'worker': worker_hosts})
config = tf.ConfigProto()
config.experimental.collective_group_leader = '/job:worker/replica:0/task:0'
server = tf.train.Server(cluster_spec, config=config,
job_name='worker', task_index=task_index)
with tf.Graph().as_default():
instance_count = 0
with tf.variable_scope('worker{}'.format(task_index)):
device = '/job:worker/replica:0/task:{}/device:CPU:0'.format(
task_index)
with tf.device(device):
weight = tf.get_variable('weight', dtype=tf.float32,
initializer=0.1)
loss = weight * weight - 2 * weight
trainable_variables = [weight]
gradients = tf.gradients(loss, trainable_variables)
reduced_gradients = list()
group_size = NUM_PROCESSES
group_key = NUM_PROCESSES
instance_key = 2
print('allreduce : {:4d}, {:4d}, {:4d}'.format(
group_size, group_key, instance_key))
for gradient in gradients:
reduced_gradients.append(collective_ops.all_reduce(
gradient, group_size, group_key, instance_key,
'Add', 'Div'))
instance_key += 1
optimizer = tf.train.GradientDescentOptimizer(0.1)
train_op = optimizer.apply_gradients(zip(
reduced_gradients, trainable_variables))
session_creator = tf.train.ChiefSessionCreator(master=server.target)
with tf.train.MonitoredSession(session_creator=session_creator) \
as mon_sess:
result = mon_sess.run(weight)
print('task {} sense {}'.format(task_index, result))
for gstep in range(5):
result, _ = mon_sess.run([trainable_variables, train_op])
print('task {} get {}'.format(task_index, result))
result = mon_sess.run(trainable_variables)
print('task {} get {}'.format(task_index, result))
time.sleep(0.1)
def start_process():
"""start process"""
import time
import multiprocessing as mp
port = 60000
host_fmt = 'localhost:{}'
worker_hosts = list()
for process_index in range(NUM_PROCESSES):
worker_hosts.append(host_fmt.format(port + process_index))
mp_ctx = mp.get_context(MP_METHOD)
processes = list()
for process_index in range(NUM_PROCESSES):
process = mp_ctx.Process(target=process_fn,
args=(worker_hosts, process_index,))
processes.append(process)
process.start()
time.sleep(0.1)
for process in processes:
process.join()
if __name__ == '__main__':
start_process()
выход выборки:
task 1 sense 0.10000000149011612
task 0 sense 0.10000000149011612
task 1 get [0.28]
task 0 get [0.28]
task 1 get [0.42400002]
task 0 get [0.42400002]
task 1 get [0.5392]
task 0 get [0.5392]
task 1 get [0.63136]
task 0 get [0.63136]
task 1 get [0.705088]
task 0 get [0.705088]
task 1 get [0.705088]
task 0 get [0.705088]
выходработника 0 в проекте, встраивая фрагмент кода:
task 0: before syncing ...
{'u': 0.1, 'loss': -0.19, 'global_step': 0}
task 0: running init_ops ...
{'u': 0.1, 'loss': -0.19, 'global_step': 0}
task 0: obtaining gstep ...
train one step at step 0.
2019-09-30 10:15:56.954439: I tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc:159] Skipping rendezvous re-initialization.
{'u': 0.1, 'loss': -0.19, 'global_step': 1}
train one step at step 1.
2019-09-30 10:15:56.959006: I tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc:159] Skipping rendezvous re-initialization.
{'u': 0.28, 'loss': -0.4816, 'global_step': 2}
train one step at step 2.
2019-09-30 10:15:56.962205: I tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc:159] Skipping rendezvous re-initialization.
{'u': 0.42396685, 'loss': -0.66818583, 'global_step': 3}
train one step at step 3.
2019-09-30 10:15:56.965719: I tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc:159] Skipping rendezvous re-initialization.
{'u': 0.53912044, 'loss': -0.78759, 'global_step': 4}
train one step at step 4.
2019-09-30 10:15:56.969335: I tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc:159] Skipping rendezvous re-initialization.
{'u': 0.6312327, 'loss': -0.8640107, 'global_step': 5}
{'u': 0.70491827, 'loss': -0.9129268, 'global_step': 5}
task 0: final syncing ...
task 0: finalized
вывод работника1:
task 1: before syncing ...
{'u': 0.1, 'loss': -0.19, 'global_step': 0}
task 1: running init_ops ...
{'u': 0.1, 'loss': -0.19, 'global_step': 0}
task 1: obtaining gstep ...
train one step at step 0.
{'u': 0.1, 'loss': -0.19, 'global_step': 0}
train one step at step 1.
{'u': 0.28, 'loss': -0.4816, 'global_step': 1}
train one step at step 2.
{'u': 0.42396685, 'loss': -0.66818583, 'global_step': 2}
train one step at step 3.
{'u': 0.53912044, 'loss': -0.78759, 'global_step': 3}
train one step at step 4.
{'u': 0.6312327, 'loss': -0.8640107, 'global_step': 4}
{'u': 0.70491827, 'loss': -0.9129268, 'global_step': 5}
task 1: final syncing ...
task 1: finalized