Чтобы уменьшить количество синхронизаций в распределенном обучении, я хочу сначала выполнить локальное накопление градиентов.точно так же, как вы можете иметь несколько графических процессоров, но последовательно, а не параллельно.
Я хочу использовать его в цикле estimator.train со стратегией распределения, такой как зеркальное отображение и коллективное allreduce и т. д.
Вот моя реализация, пожалуйста, дайте мне некоторые входные данные:)
Во-первых, потому что мне нужно запустить другой граф в session.run (), поэтому я изменил estimator.EstimatorSpec, чтобы получить больше операций.Во-вторых, кажется, что нет ясного способа создать локальную переменную без общего доступа в локальном GPU в среде стратегии распределения.Мне пришлось взломать некоторую переменную_создания_области.
Вот взломанная функция переменной_создателя,
def skip_all_scope_variable_creator(next_creator=None, on_device=None, **kwargs):
#print("skip_all_scope_variable_creator:[{}]".format(kwargs))
initial_value = kwargs.get("initial_value", None)
trainable = kwargs.get("trainable", None)
collections = kwargs.get("collections", None)
validate_shape = kwargs.get("validate_shape", True)
caching_device = kwargs.get("caching_device", None)
name = kwargs.get("name", None)
variable_def = kwargs.get("variable_def", None)
dtype = kwargs.get("dtype", None)
expected_shape = kwargs.get("expected_shape", None)
import_scope = kwargs.get("import_scope", None)
constraint = kwargs.get("constraint", None)
use_resource = kwargs.get("use_resource", None)
with tf.device(on_device) :
return resource_variable_ops.ResourceVariable(
initial_value=initial_value, trainable=trainable,
collections=collections, validate_shape=validate_shape,
caching_device=caching_device, name=name, dtype=dtype,
constraint=constraint, variable_def=variable_def,
import_scope=import_scope)
Вот мой код внутри model_fn () для создания трех операций,
loss = loss_from_model
optimizer = some_optimizer
tvars = tf.trainable_variables()
gradients = optimizer.compute_gradients(
loss, tvars, colocate_gradients_with_ops=True)
accumulate_pass_num = FLAGS.pass_per_batch
if accumulate_pass_num > 1 :
accum_grads = []
accum_vars = []
reset_grad_ops = []
accum_grad_ops = []
for g,v in gradients:
accum_vars.append(v)
if g is not None:
with tf.variable_creator_scope(lambda next_creator=None, **kwargs: skip_all_scope_variable_creator(next_creator, g.device, **kwargs)):
print("create accum_grad for variable:{}".format(v.name))
tmp_grad_on_device = tf.Variable(tf.zeros_like(g), trainable=False, synchronization=tf.VariableSynchronization.ON_READ, collections=[tf.GraphKeys.LOCAL_VARIABLES], name='tmp_accum_grad')
reset_one_grad_op = tf.assign(tmp_grad_on_device, g, name="reset_accumulated_gradient_op")
reset_grad_ops.append(reset_one_grad_op)
# the return of assign_add is the value will be update
accum_grad_on_device = tmp_grad_on_device.assign_add(g, name="accumulate_gradient")
accum_grad_ops.append(accum_grad_on_device)
accum_grads.append(accum_grad_on_device)
else:
accum_grads.append(None)
accumulate_gradients_op = tf.group(*accum_grad_ops, name="grouped_accu_grad_op")
reset_gradients_op = tf.group(*reset_grad_ops, name="grouped_reset_gradients_op")
accum_grad_means = [tf.multiply(v, 1.0/accumulate_pass_num) if v is not None else None for v in accum_grads]
accum_grads_vars = zip(accum_grad_means, accum_vars)
minimize_op = optimizer.apply_gradients(
accum_grads_vars, global_step=global_step, name="train")
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
train_op = tf.group(minimize_op, update_ops)
return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op, accumulate_gradients_op=accumulate_gradients_op, reset_gradients_op=reset_gradients_op, accumulate_pass_num=accumulate_pass_num)
Вот модифицированный estimator.train () для запуска разных операций,
while not mon_sess.should_stop():
if estimator_spec.accumulate_pass_num > 1 :
# reset gradiends first
mon_sess.run([estimator_spec.reset_gradients_op])
for _ in range(estimator_spec.accumulate_pass_num-2):
mon_sess.run([estimator_spec.accumulate_gradients_op])
_, loss = mon_sess.run([estimator_spec.train_op, estimator_spec.loss])
Я попробовал это на модели трансформатора в официальном репозитории моделей Google.Результаты были хорошими.
Мой вопрос: есть ли лучшие способы сделать это?
Должен ли я рассмотреть использование tf.cond () для выбора операций, возвращаемых в model_fn, чтобы Estimator и EstimatorSpec не нуждались в изменении?Но это кажется очень сложным: (
Большое спасибо!
Донг