Почему время вычислений этого минимального кода Tensorflow V2 увеличивается с количеством графических процессоров вместо параллельной работы? - PullRequest
0 голосов
/ 05 ноября 2019

Я попытался использовать стратегию зеркального распределения Tensorflow V2 по нескольким графическим процессорам со следующим кодом (ввод данных и модель являются фиктивными).

Как видите, на каждой итерации выводится время вычисления шага обучения. ,Как ни странно, вычисления увеличиваются по мере того, как я увеличиваю количество графических процессоров.

$ CUDA_VISIBLE_DEVICES=0 python train_v2_multi_example.py
...
Ep 01/100 | step 02 | 0.473 sec/step | loss: 46485.430
Ep 01/100 | step 03 | 0.482 sec/step | loss: 9216.726

$ CUDA_VISIBLE_DEVICES=0,1 python train_v2_multi_example.py
...
Ep 01/100 | step 02 | 1.141 sec/step | loss: 22627.699
Ep 01/100 | step 03 | 1.091 sec/step | loss: 11679.490

$ CUDA_VISIBLE_DEVICES=0,1,2 python train_v2_multi_example.py
...
Ep 01/100 | step 02 | 1.408 sec/step | loss: 32166.996
Ep 01/100 | step 03 | 1.380 sec/step | loss: 14036.578
  • Я проверил, что входные и выходные данные для каждой реплики одинаковы в этих ситуациях.
  • Я повторяю здесь те же фиктивные данные, поэтому не думаю, что это проблема, связанная с конвейером данных.

Почему я не получаю преимущества параллелизма здесь? Любой совет, пожалуйста?

Код выглядит следующим образом:

from __future__ import absolute_import, division, print_function, unicode_literals
import os, time, sys, numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import (Conv2D, Conv3D, Dense)


@tf.function
def loss_fn(y_pred, y_true):
    return tf.reduce_mean(tf.math.square(y_pred - y_true))

@tf.function
def train_step(dist_inputs):
    def step_fn(inputs):
        inputs, labels = inputs

        # tf.print("in", tf.shape(inputs), "out", tf.shape(labels), output_stream=sys.stdout)
        with tf.GradientTape() as tape:
            out = model(inputs)
            loss_value = loss_fn(out, labels)
        grads = tape.gradient(loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))
        return loss_value

    per_example_losses = strategy.experimental_run_v2(step_fn, args=(dist_inputs,))
    mean_loss = strategy.reduce(tf.distribute.ReduceOp.MEAN, per_example_losses, axis=None)
    return mean_loss


if __name__ == "__main__":

    BATCH_SIZE_PER_SYNC = 4
    logdir = os.path.join('logs/test')
    strategy = tf.distribute.MirroredStrategy()
    num_gpus = strategy.num_replicas_in_sync
    global_batch_size = BATCH_SIZE_PER_SYNC * num_gpus
    print('num GPUs: {}, global batch size: {}'.format(num_gpus, global_batch_size))

    # fake data ------------------------------------------------------
    fakea = np.random.rand(global_batch_size, 10, 200, 200, 128).astype(np.float32)
    targets = np.random.rand(global_batch_size, 200, 200, 14)

    # tf.Dataset ------------------------------------------------------
    def gen():
        while True:
            yield (fakea, targets)

    dataset = tf.data.Dataset.from_generator(gen,
        (tf.float32, tf.float32),
        (tf.TensorShape(fakea.shape), tf.TensorShape(targets.shape)))

    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    dist_dataset = strategy.experimental_distribute_dataset(dataset)

    # Model ------------------------------------------------------
    training = True
    with strategy.scope():
        # Model
        va = keras.Input(shape=(10, 200, 200, 128), dtype=tf.float32, name='va')
        x = Conv3D(64, kernel_size=3, strides=1, padding='same')(va)
        x = Conv3D(64, kernel_size=3, strides=1, padding='same')(x)
        x = Conv3D(64, kernel_size=3, strides=1, padding='same')(x)
        x = tf.reduce_max(x, axis=1, name='maxpool')  # [ΣK, 128]
        b = Conv2D(14, kernel_size=3, padding='same')(x)
        model = keras.Model(inputs=va, outputs=b, name='net')
        optimizer = keras.optimizers.RMSprop()
    model.summary()

    # TRAIN ---------------------------------------------------------
    writer = tf.summary.create_file_writer(logdir)

    num_steps = 100
    num_epoches = 100
    global_step = 0

    with strategy.scope():
        iterator = iter(dist_dataset)
        with writer.as_default():
            for epoch in range(num_epoches):
                for step in range(num_steps):

                    if global_step == 0 or 5 < global_step < 8:
                        tf.summary.trace_on(graph=True, profiler=True)

                    start = time.time()
                    loss_value = train_step(next(iterator))
                    duration = time.time() - start

                    prefix = 'Ep {:02d}/{:02d} | step {:02d} '.format(epoch + 1, num_epoches, step)
                    suffix = '| {:.3f} sec/step | loss: {:.3f} '.format(duration, float(loss_value))
                    print(prefix + suffix)

                    tf.summary.scalar("loss", loss_value, step=global_step)

                    if global_step == 0 or 5 < global_step < 8:
                        tf.summary.trace_export(name="model_trace", step=global_step, profiler_outdir=logdir)
                    writer.flush()
                    global_step += 1
...