У меня есть Variational Autoencoder (VAE), который я хотел бы преобразовать в Условный Variational Autoencoder (CVAE). Цель состоит в том, чтобы использовать CVAE для пересылки модели моей наблюдаемой (которая задается одномерным временным рядом), как объяснено ниже.
Входные выборки для VAE являются одномерными временными рядами. У меня 2700 из них в моем тренировочном наборе, и 300 из них в моем тестовом наборе. Условная переменная представляет собой набор из трех координат x,y,z
. Для каждого триплета x,y,z
существует только один возможных 1D временных рядов. Я хотел бы обучить CVAE, чтобы я мог повторно использовать декодер для пересылки модели временного ряда: вводя новые триплеты x,y,z
, я хотел бы видеть, как выглядят соответствующие 1D временные ряды.
Проблема состоит в том (я считаю), что мне трудно одновременно уменьшать потери KL и Reconstruction (последний - MAE). Если я позволю, чтобы потери KL и Reconstruction имели одинаковый вес, KL слишком быстро сходится и «заглушает» потери Reconstruction, что не позволяет уменьшить ее выше определенной точки. Если я умножу потери KL на постоянный коэффициент (малый, например, 0,0001), то эта архитектура «работает» как VAE, что означает, что 1-мерные временные ряды, отправленные на вход, корректно восстанавливаются. Это достигается за счет потери KL, которая становится огромной по сравнению с потерями на реконструкцию. Следовательно, эта модель не может быть обучена как CVAE, потому что для ее использования в качестве CVAE потери KL также должны быть уменьшены аналогично потерям на восстановление.
Наконец, если я использую отжиг KL ( как показано ниже в моей реализации), мне удается уменьшить потери как KL, так и Reconstruction, но по какой-то странной причине кажется, что этого недостаточно для работы CVAE: одномерный временной ряд, сгенерированный из заданных координат в набор данных тестирования просто ужасен и не соответствует 1D временному ряду в наборе тестирования.
Это моя реализация:
epsilon_std = 1.0
# dimension of input
n_x = X_train.shape[1] # 2001 time components for each time series
n_y = y_train.shape[1] # 3 coordinates x,y,z
# 2700 samples for input, 300 for output
X_train = np.reshape(X_train, [2700, n_x, 1])
X_test = np.reshape(X_test, [300, n_x, 1])
y_train = np.reshape(y_train, [2700, n_y, 1])
y_test = np.reshape(y_test, [300, n_y, 1])
# dimension of latent space (batch size by latent dim)
batch_size = 100
latent_dim = 700
### network parameters
Conv_1 = 8
Conv_2 = 16
Dense_1 = 700
Dense_2 = 400
# Convolution Kernel
Kernel_1 = 16
Kernel_2 = 64
x = Input(shape=(n_x,1))
cond = Input(shape=(n_y, 1))
inputs = concatenate([x, cond], axis=1)
conv_1 = Conv1D(Conv_1, kernel_size=Kernel_1,
padding='valid', strides=Stride_1, activation='tanh')(inputs)
maxp1 = MaxPooling1D(2)(conv_1)
conv_2 = Conv1D(Conv_2, kernel_size=Kernel_2,
padding='valid', strides=Stride_2, activation='tanh')(maxp1)
maxp2 = MaxPooling1D(2)(conv_2)
flatten = Flatten()(maxp2)
hidden = Dense(Dense_1, activation='tanh')(flatten)
z_mean = Dense(latent_dim)(hidden)
z_log_var = Dense(latent_dim)(hidden)
def sampling(args):
z_mean, z_log_var = args
epsilon = K.random_normal(shape=(batch_size, latent_dim),
mean=0., stddev=epsilon_std)
return(z_mean + K.exp(z_log_var/2) * epsilon)
z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])
z = Reshape([latent_dim, 1])(z)
z_cond = concatenate([z, cond], axis=1)
de_conv_1 = Conv1D(Conv_2, kernel_size=Kernel_2,
padding='valid', activation='tanh')
upsamp1 = UpSampling1D(2)
de_conv_2 = Conv1D(Conv_1, kernel_size=Kernel_1,
padding='valid', activation='tanh')
upsamp2 = UpSampling1D(2)
flatten = Flatten()
x_decoded_mean0 = Dense(n_x)
x_decoded_mean = Reshape([n_x, 1])
# decoder (as part of the CVAE)
h_p = de_conv_1(z_cond)
h_p = upsamp1(h_p)
h_p = de_conv_2(h_p)
h_p = upsamp2(h_p)
h_p = flatten(h_p)
h_p = x_decoded_mean0(h_p)
out = x_decoded_mean(h_p)
cvae = Model([x, cond], out)
cvae.summary()
# decoder (for re-user after training, for 1D time series generation)
decoder_input = Input(shape=(latent_dim+n_y, 1))
d_h = de_conv_1(decoder_input)
d_h = upsamp1(d_h)
d_h = de_conv_2(d_h)
d_h = upsamp2(d_h)
d_h = flatten(d_h)
d_h = x_decoded_mean0(d_h)
x_pred = x_decoded_mean(d_h)
decoder = Model(decoder_input, x_pred)
decoder.summary()
epochs = 60
# The number of epochs at which KL loss should be included
klstart = 50
# number of epochs over which KL scaling is increased from 0 to 1
kl_annealtime = 20
class AnnealingCallback(Callback):
def __init__(self, weight):
self.weight = weight
def on_epoch_end (self, epoch, logs={}):
if epoch < klstart :
new_weight = 0.1
K.set_value(self.weight, new_weight)
if epoch > klstart :
new_weight = min(K.get_value(self.weight) + (1./ kl_annealtime), 1.)
K.set_value(self.weight, new_weight)
print ("Current KL Weight is " + str(K.get_value(self.weight)))
# the starting value of weight is 0
weight = K.variable(0.)
# define it as a keras backend variable
# wrap the loss as a function of weight
def vae_loss(weight):
def loss (y_true, y_pred):
# mse loss
reconstruction_loss = K.mean(K.square(y_true - y_pred), axis=[1, 2])
# kl loss
kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
kl_loss = K.mean(kl_loss, axis=-1)
kl_loss *= -0.5 #* 0.001
return K.mean(reconstruction_loss + (weight * kl_loss))
return loss
def KL_loss(y_true, y_pred):
return(0.5* K.mean(K.exp(z_log_var)/(epsilon_std**2) + K.square(z_mean)/(epsilon_std**2) - 1. - z_log_var + np.log(epsilon_std**2), axis=-1))
def recon_loss(y_true, y_pred):
return K.mean(K.square(y_true - y_pred), axis=[1, 2])
## compile vae with the weighted vae loss
vae.compile(optimizer='adam', loss=vae_loss(weight), metrics=[KL_loss, recon_loss])
## train VAE with annealing callback
vae_hist = vae.fit([X_train, y_train], X_train, epochs=epochs, batch_size=batch_size, validation_data = ([X_test, y_test], X_test), callbacks=[AnnealingCallback(weight)])