Ошибка, когда я пытаюсь запустить распределенное обучение с Эдвардом. Ниже приведен код:
parameter_servers = ["localhost:2222"]
workers = ["localhost:2223"]
cluster = tf.train.ClusterSpec({"ps": parameter_servers, "worker":
workers})
# start a server for a specific task
server = tf.train.Server(
cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
class VAE(object):
def __init__():
# MODEL
self.n_features = n_features
self.params = {
'M': 2048,
'd': 5,
'n_epoch': 2,
'hidden_d': [25, 5],
'learning_rate': 0.01
}
self.saved_dir_path = dir_path
self.ckpt_path = os.path.join(self.saved_dir_path,
'checkpointFiles/') + 'model.ckpt'
# distributed training
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
# Between-graph replication
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
self.global_step = tf.get_variable(
'global_step',
[],
dtype=tf.int64,
initializer=tf.constant_initializer(0),
trainable=False)
self.z = Normal(
loc=tf.zeros([self.params['M'], self.params['d']]),
scale=tf.ones([self.params['M'],
self.params['d']]))
# self.hidden0 = tf.layers.dense(
self.hidden = tf.layers.dense(
self.z, self.params['hidden_d'][0],
activation=tf.nn.relu)
if len(self.params['hidden_d']) > 1:
for i in xrange(1, len(params['hidden_d'])):
self.hidden = \
tf.layers.dense(
self.hidden,
self.params['hidden_d'][i],
activation=tf.nn.relu)
self.x = Bernoulli(
logits=tf.layers.dense(
self.hidden,
self.n_features), dtype=tf.float64)
# INFERENCE
self.x_ph = tf.placeholder(dtype=tf.float64,shape=[None,
self.n_features])
self.hidden = tf.layers.dense(
tf.cast(self.x_ph, tf.float32),
self.params['hidden_d'][-1],
activation=tf.nn.relu)
if len(self.params['hidden_d']) > 1:
for i in xrange(1, len(params['hidden_d'])):
j = -(1+i)
self.hidden = \
tf.layers.dense(
self.hidden,
self.params['hidden_d'][j],
activation=tf.nn.relu)
self.qz = Normal(
loc=tf.layers.dense(self.hidden, self.params['d']),
scale=tf.layers.dense(
self.hidden, self.params['d'],
activation=tf.nn.softplus))
self.x_avg = Bernoulli(
logits=tf.reduce_mean(self.x.parameters['logits'], 0),
name='x_avg')
self.log_likli =
tf.reduce_mean(self.x_avg.log_prob(self.x_ph), 1)
self.optimizer = tf.train.RMSPropOptimizer(
self.params['learning_rate'], epsilon=1.0)
# self.
self.inference = ed.KLqp({self.z: self.qz}, data={self.x:
self.x_ph})
self.inference_init = self.inference.initialize(
optimizer=self.optimizer, global_step =
self.global_step, logdir='log')
self.init = tf.global_variables_initializer()
self.saver = tf.train.Saver()
def train(self, train_data):
#Generate x_batch
start = 0 # pointer to where we are in iteration
while True:
stop = start + self.params['M']
diff = stop - train_data.shape[0]
if diff <= 0:
batch = train_data[start:stop]
start += self.params['M']
else:
batch = np.concatenate((train_data[start:],
train_data[:diff]))
start = diff
yield batch
train_data_generator = batch
saver_hook = tf.train.CheckpointSaverHook(
checkpoint_dir=FLAGS.model_path,
save_steps=100,
saver=tf.train.Saver(),
checkpoint_basename='model.ckpt',
scaffold=None
)
hooks = [saver_hook]
with tf.train.MonitoredTrainingSession(
master=server.target,
is_chief=(FLAGS.task_index == 0),
checkpoint_dir=FLAGS.model_path,
hooks=hooks,
config= tf.ConfigProto(allow_soft_placement=True,
log_device_placement=True),
save_summaries_steps=None,
save_summaries_secs=None
) as sess:
sess.run(self.init)
# sess.run(self.inference_init)
# self.inference.initialize(optimizer=self.optimizer)
n_iter_per_epoch = np.ceil(
train_data.shape[0] / self.params['M']).astype(int)
for epoch in xrange(1, self.params['n_epoch'] + 1):
print "Epoch: {0}".format(epoch)
avg_loss = 0.0
pbar = Progbar(n_iter_per_epoch)
for t in xrange(1, n_iter_per_epoch + 1):
pbar.update(t)
x_batch = next(train_data_generator)
info_dict = self.inference.update(
feed_dict={self.x_ph: x_batch})
avg_loss += info_dict['loss']
avg_loss /= n_iter_per_epoch
avg_loss /= self.params['M']
print "-log p(x) <= {:0.3f}".format(avg_loss)
print "Done training the model."
if __name__ == "__main__":
vae = VAE()
vae.train(data)
Ошибка заключается в следующем:
Невозможно назначить устройство для операции 'optimizer / dens_1 / bias / RMSProp_1': операция была явно назначена для / job: ps / task: 0, но доступны устройства [/ job: localhost / replica: 0 / task: 0 / device: CPU : 0]. Убедитесь, что в спецификации устройства указано действительное устройство.
[[Узел: оптимизатор / dens_1 / bias / RMSProp_1 = VariableV2_class = ["loc: @ dens_1 / bias"], контейнер = "", dtype = DT_FLOAT, shape = [5], shared_name = "", _device = "/ job : пс / задача: 0" ]]