Принимает ли Tenorflow Estimator разные партии для рабочих при использовании MirroredStrategy? - PullRequest
0 голосов
/ 23 января 2019

Я использую GANEstimator с MirroredStrategy для работы на нескольких графических процессорах одного экземпляра.input_fn в моем случае - tf.data.Dataset со следующими настройками:

dataset = dataset.repeat()
dataset = dataset.shuffle(buffer_size=100)
dataset = dataset.batch(self.batch_size, drop_remainder=True)
dataset = dataset.prefetch(100)

Причина, по которой я спрашиваю это, заключается в том, что мне нужно указать что-то вроде dataset.shard() вручную, чтобы передавать разные данныерабочим?Я копаю код Оценщик и MirroredStrategy , но мне неясно, что происходит.Дополнительная путаница возникает из описания распределенных стратегий :

MirroredStrategy: This does in-graph replication with synchronous 
training on many GPUs on one machine. Essentially, we create copies of all
variables in the model's layers on each device. We then use all-reduce 
to combine gradients across the devices before applying them 
to the variables to keep them in sync.

CollectiveAllReduceStrategy: This is a version of MirroredStrategy 
for multi-worker training. 

Так использует ли MirroredStratedy только одного работника?Я не понимаю этоМне нужно указать размер партии, равный вместимости одной башни, в противном случае я получу OOM.Может кто-нибудь указать мне код и объяснить, как такая простая установка работает с пакетами:

def create_dataset():
    ...
    dataset = dataset.repeat()
    dataset = dataset.shuffle(buffer_size=100)
    dataset = dataset.batch(self.batch_size, drop_remainder=True)
    dataset = dataset.prefetch(100)
    return dataset



NUM_GPUS = 4
strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)

optimizer = tf.train.RMSPropOptimizer(learning_rate=0.01, use_locking=True)
optimizer_d = tf.train.RMSPropOptimizer(learning_rate=0.01, use_locking=True)

config = tf.estimator.RunConfig(save_checkpoints_steps=100, 
          save_summary_steps=1, keep_checkpoint_max=50, 
          train_distribute=strategy)

# I have more hooks here, just simplified to show 
def get_hooks_fn(GANTrainOps):

    disjoint_train_hook_func = tfgan.get_sequential_train_hooks(
                 train_steps=tfgan.GANTrainSteps(10, 1)
                 ) # g steps, d steps
    disjoint_train_hooks = disjoint_train_hook_func(GANTrainOps)
    return [update_hook, summary_hook] + disjoint_train_hooks


# Create GAN estimator.
gan_estimator = tfgan.estimator.GANEstimator(
    model_dir = '/data/checkpoints/estimator_model', 
    generator_fn = generator_fn,
    discriminator_fn = discriminator_fn,
    generator_loss_fn = generator_loss_fn, 
    discriminator_loss_fn = discriminator_loss_fn, 
    generator_optimizer = optimizer,
    discriminator_optimizer = optimizer_d, 
    use_loss_summaries=True,
    config=config,
    get_hooks_fn=get_hooks_fn)


gan_estimator.train(input_fn=create_dataset, steps=10000)

Спасибо!

Код MirroredStrategy содержит:

1Странная формулировка:

Версия этого класса для нескольких рабочих отображает одну реплику на одно устройство на рабочем.Он отражает все переменные модели на всех репликах.Например, если у вас есть два worker с, а у каждого worker есть 4 графических процессора, на этих 8 графических процессорах будет создано 8 копий переменных модели.Затем, как и в MirroredStrategy (???), каждая реплика выполняет свои вычисления с собственной копией переменных, если только в модели кросс-реплики, где происходит уменьшение переменной или тензора.

2)

auto_shard_dataset: выполнять ли автоматическое разбиение набора данных при наличии нескольких рабочих.

По умолчанию этот параметр имеет значение False.

РЕДАКТИРОВАТЬ:

До сих пор я обнаружил, что tf.estimator.train() через некоторое время указывает на то, чтокажется, что strategy.make_input_fn_iterator():

def _get_iterator_from_input_fn(self, input_fn, mode, distribution=None):
    if distribution is not None:
      iterator = distribution.make_input_fn_iterator(
          lambda _: self._call_input_fn(input_fn, mode))
      input_hooks = [
          estimator_util.DistributedIteratorInitializerHook(iterator)]
    else:
      result = self._call_input_fn(input_fn, mode)
      iterator = result.make_initializable_iterator()
      input_hooks = [estimator_util._DatasetInitializerHook(iterator)]  
return iterator, input_hooks

make_input_fn_iterator()

Но он был удален из кода MirroredStrategy и больше не существует!Я не понимаю, как это работает и где набор данных на самом деле разделен.

EDIT2: Я не могу найти строку make_input_fn_iterator в моем распределении тензор потока 1.12.0 с помощью grep.Похоже, в коде он полностью отсутствует.

1 Ответ

0 голосов
/ 23 января 2019

Хорошо, потратив некоторое время на изучение github, я обнаружил, что он уже отличается от моего tf 1.12.0. Итак, заход в локальные файлы 1.12.0 дал мне:

GANEstimator наследует tf.python.estimator.Estimator

Estimator.init():

# The distribute field contains an instance of DistributionStrategy.
    self._train_distribution = self._config.train_distribute

Тогда путь вниз:

tf.contrib.gan.GANEstimator -> tf.python.estimator.Estimator.train() --> 
tf.python.estimator.Estimator._train_model(input_fn, hooks, saving_listeners) --> 
._train_model_distributed(input_fn, hooks, saving_listeners) --> 
._get_iterator_from_input_fn(input_fn, model_fn_lib.ModeKeys.TRAIN, self._train_distribution) --> 
distribution.distribute_dataset(lambda: self._call_input_fn(input_fn, mode))

, который вызывает в моем случае для MirrorredStrategy.distribute_dataset():

def distribute_dataset(self, dataset_fn):
    if self._cluster_spec:
      return values.MultiWorkerDataset(
          partial(self._call_dataset_fn, dataset_fn), self._worker_device_map,
          self._prefetch_on_device, self._auto_shard_dataset)
    else:
      return values.PerDeviceDataset(
          self._call_dataset_fn(dataset_fn), self._devices,
          self._prefetch_on_device)

tensorflow/python/training/distribute.py:

  def _call_dataset_fn(self, dataset_fn):
    result = dataset_fn()
    if not isinstance(result, dataset_ops.Dataset):
      raise ValueError(
          "dataset_fn() must return a tf.data.Dataset when using a "
          "DistributionStrategy.")
    return result

Я предполагаю, что используется PerDeviceDataset, поэтому, наконец, я нахожу эти два класса в values.py:

class PerDeviceDataset(object):
  """Like `tf.data.Dataset` split devices, producing `PerDevice` data."""

  def __init__(self, dataset, devices, prefetch_on_device=None):
    self._devices = devices

    # Default to using prefetching in graph mode, unless specified.
    # TODO(priyag): Enable prefetching in eager mode.
    self._prefetch_on_device = prefetch_on_device
    if self._prefetch_on_device is None:
      self._prefetch_on_device = not context.executing_eagerly()
    assert not (self._prefetch_on_device and context.executing_eagerly()), (
        "Prefetching is only supported in graph mode currently")

    if self._prefetch_on_device:
      self._dataset = dataset.apply(
          prefetching_ops_v2.prefetch_to_devices(self._devices))
    else:
      # TODO(priyag): If dropping remainder is not appropriate, find another
      # approach to distributing the dataset when not possible to divide evenly.
      # Possibly not an issue when we start using PartitionedDataset.
      self._dataset = dataset.batch(len(devices), drop_remainder=True)

  def make_one_shot_iterator(self):
    """Get a one time use iterator for the distributed PerDeviceDataset."""
    dataset_iterator = self._dataset.make_one_shot_iterator()
    return PerDeviceDataIterator(dataset_iterator, self._devices,
                                 self._prefetch_on_device)

  def make_initializable_iterator(self):
    """Get an initializable iterator for the distributed PerDeviceDataset."""
    dataset_iterator = self._dataset.make_initializable_iterator()
    return PerDeviceDataIterator(dataset_iterator, self._devices,
                                 self._prefetch_on_device)


class PerDeviceDataIterator(object):
  """An iterator (like `tf.data.Iterator`) into a `PerDeviceDataset`."""

  def __init__(self, iterator, devices, prefetch_on_device=None):
    self._iterator = iterator
    self._devices = devices
    self._prefetch_on_device = prefetch_on_device

  @property
  def initializer(self):
    return self._iterator.initializer

  def get_next(self, name=None):
    """Scatter the input across devices."""
    if self._prefetch_on_device:
      data_list = self._iterator.get_next(name=name)
      index = dict(zip(self._devices, data_list))
    else:
      batch = self._iterator.get_next(name=name)
      index = {}
      def get_ith(i):
        return lambda x: x[i]

      for i, d in enumerate(self._devices):
        index[d] = nest.map_structure(get_ith(i), batch)
        if context.executing_eagerly():
          with ops.device(d):
            index[d] = nest.map_structure(array_ops.identity, index[d])

    return regroup(index)

Итак, насколько я понимаю, и сначала моя функция dataset_fn() просто вызывается для получения объекта набора данных, а затем к нему применяется пакет с размером числа графических процессоров. Элементы этого пакета, которые должны быть фактическими партиями, определенными в моей инициализации набора данных внутри dataset_fn(), назначаются различным устройствам.

...