Когда происходит «Пропуск повторной инициализации рандеву». появляются в вызове colle_ops в TensorFlow - PullRequest
1 голос
/ 30 сентября 2019

collective_ops.all_reduce в tensorflow.python.ops очень полезен для сбора градиентов в распределенной среде. Я пробую фрагмент кода, и он отлично работает. Когда этот метод встроен в проект, он работает, но вывод немного отличается от исходного фрагмента кода, который равен 0.70491827 по сравнению с 0.705088. Кроме того, worker0 продолжает печатать дополнительную информацию о повторной инициализации , даже если worker1 был установлен как collective_group_leader. Интересно, как возникает эта проблема, и есть ли подсказка, чтобы ее исправить. Код запускается в тензорном потоке 1.13 с питоном 3.6.8.

образец без проблем:

"""Illustrate all reduce using collective_ops
"""

MP_METHOD = 'fork'  # 'fork' (UNIX), 'spawn' (WINDOWS);
NUM_PROCESSES = 2

def process_fn(worker_hosts, task_index):
    """allreduce process"""

    import os
    import time
    import tensorflow as tf
    from tensorflow.python.ops import collective_ops

    os.environ['CUDA_VISIBLE_DEVICES'] = str(task_index)
    cluster_spec = tf.train.ClusterSpec({'worker': worker_hosts})

    config = tf.ConfigProto()
    config.experimental.collective_group_leader = '/job:worker/replica:0/task:0'
    server = tf.train.Server(cluster_spec, config=config,
                             job_name='worker', task_index=task_index)
    with tf.Graph().as_default():
        instance_count = 0
        with tf.variable_scope('worker{}'.format(task_index)):
            device = '/job:worker/replica:0/task:{}/device:CPU:0'.format(
                task_index)
            with tf.device(device):
                weight = tf.get_variable('weight', dtype=tf.float32,
                                         initializer=0.1)
                loss = weight * weight - 2 * weight
                trainable_variables = [weight]
                gradients = tf.gradients(loss, trainable_variables)
                reduced_gradients = list()
                group_size = NUM_PROCESSES
                group_key = NUM_PROCESSES
                instance_key = 2
                print('allreduce : {:4d}, {:4d}, {:4d}'.format(
                    group_size, group_key, instance_key))
                for gradient in gradients:
                    reduced_gradients.append(collective_ops.all_reduce(
                        gradient, group_size, group_key,  instance_key,
                        'Add', 'Div'))
                    instance_key += 1
                optimizer = tf.train.GradientDescentOptimizer(0.1)
                train_op = optimizer.apply_gradients(zip(
                    reduced_gradients, trainable_variables))

        session_creator = tf.train.ChiefSessionCreator(master=server.target)
        with tf.train.MonitoredSession(session_creator=session_creator) \
                as mon_sess:
            result = mon_sess.run(weight)
            print('task {} sense {}'.format(task_index, result))
            for gstep in range(5):
                result, _ = mon_sess.run([trainable_variables, train_op])
                print('task {} get {}'.format(task_index, result))
            result = mon_sess.run(trainable_variables)
            print('task {} get {}'.format(task_index, result))
            time.sleep(0.1)

def start_process():
    """start process"""
    import time
    import multiprocessing as mp

    port = 60000
    host_fmt = 'localhost:{}'
    worker_hosts = list()
    for process_index in range(NUM_PROCESSES):
        worker_hosts.append(host_fmt.format(port + process_index))
    mp_ctx = mp.get_context(MP_METHOD)
    processes = list()
    for process_index in range(NUM_PROCESSES):
        process = mp_ctx.Process(target=process_fn,
                                 args=(worker_hosts, process_index,))
        processes.append(process)
        process.start()
        time.sleep(0.1)
    for process in processes:
        process.join()

if __name__ == '__main__':
    start_process()

выход выборки:

task 1 sense 0.10000000149011612
task 0 sense 0.10000000149011612
task 1 get [0.28]
task 0 get [0.28]
task 1 get [0.42400002]
task 0 get [0.42400002]
task 1 get [0.5392]
task 0 get [0.5392]
task 1 get [0.63136]
task 0 get [0.63136]
task 1 get [0.705088]
task 0 get [0.705088]
task 1 get [0.705088]
task 0 get [0.705088]

выходработника 0 в проекте, встраивая фрагмент кода:

task 0: before syncing ...
{'u': 0.1, 'loss': -0.19, 'global_step': 0}
task 0: running init_ops ...
{'u': 0.1, 'loss': -0.19, 'global_step': 0}
task 0: obtaining gstep ...
train one step at step 0.
2019-09-30 10:15:56.954439: I tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc:159] Skipping rendezvous re-initialization.
{'u': 0.1, 'loss': -0.19, 'global_step': 1}
train one step at step 1.
2019-09-30 10:15:56.959006: I tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc:159] Skipping rendezvous re-initialization.
{'u': 0.28, 'loss': -0.4816, 'global_step': 2}
train one step at step 2.
2019-09-30 10:15:56.962205: I tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc:159] Skipping rendezvous re-initialization.
{'u': 0.42396685, 'loss': -0.66818583, 'global_step': 3}
train one step at step 3.
2019-09-30 10:15:56.965719: I tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc:159] Skipping rendezvous re-initialization.
{'u': 0.53912044, 'loss': -0.78759, 'global_step': 4}
train one step at step 4.
2019-09-30 10:15:56.969335: I tensorflow/core/distributed_runtime/base_rendezvous_mgr.cc:159] Skipping rendezvous re-initialization.
{'u': 0.6312327, 'loss': -0.8640107, 'global_step': 5}
{'u': 0.70491827, 'loss': -0.9129268, 'global_step': 5}
task 0: final syncing ...
task 0: finalized

вывод работника1:

task 1: before syncing ...
{'u': 0.1, 'loss': -0.19, 'global_step': 0}
task 1: running init_ops ...
{'u': 0.1, 'loss': -0.19, 'global_step': 0}
task 1: obtaining gstep ...
train one step at step 0.
{'u': 0.1, 'loss': -0.19, 'global_step': 0}
train one step at step 1.
{'u': 0.28, 'loss': -0.4816, 'global_step': 1}
train one step at step 2.
{'u': 0.42396685, 'loss': -0.66818583, 'global_step': 2}
train one step at step 3.
{'u': 0.53912044, 'loss': -0.78759, 'global_step': 3}
train one step at step 4.
{'u': 0.6312327, 'loss': -0.8640107, 'global_step': 4}
{'u': 0.70491827, 'loss': -0.9129268, 'global_step': 5}
task 1: final syncing ...
task 1: finalized
...