Невозможно использовать условный вариационный автоэнкодер для прямого моделирования - PullRequest
0 голосов
/ 28 января 2020

У меня есть Variational Autoencoder (VAE), который я хотел бы преобразовать в Условный Variational Autoencoder (CVAE). Цель состоит в том, чтобы использовать CVAE для пересылки модели моей наблюдаемой (которая задается одномерным временным рядом), как объяснено ниже.

Входные выборки для VAE являются одномерными временными рядами. У меня 2700 из них в моем тренировочном наборе, и 300 из них в моем тестовом наборе. Условная переменная представляет собой набор из трех координат x,y,z. Для каждого триплета x,y,z существует только один возможных 1D временных рядов. Я хотел бы обучить CVAE, чтобы я мог повторно использовать декодер для пересылки модели временного ряда: вводя новые триплеты x,y,z, я хотел бы видеть, как выглядят соответствующие 1D временные ряды.

Проблема состоит в том (я считаю), что мне трудно одновременно уменьшать потери KL и Reconstruction (последний - MAE). Если я позволю, чтобы потери KL и Reconstruction имели одинаковый вес, KL слишком быстро сходится и «заглушает» потери Reconstruction, что не позволяет уменьшить ее выше определенной точки. Если я умножу потери KL на постоянный коэффициент (малый, например, 0,0001), то эта архитектура «работает» как VAE, что означает, что 1-мерные временные ряды, отправленные на вход, корректно восстанавливаются. Это достигается за счет потери KL, которая становится огромной по сравнению с потерями на реконструкцию. Следовательно, эта модель не может быть обучена как CVAE, потому что для ее использования в качестве CVAE потери KL также должны быть уменьшены аналогично потерям на восстановление.

Наконец, если я использую отжиг KL ( как показано ниже в моей реализации), мне удается уменьшить потери как KL, так и Reconstruction, но по какой-то странной причине кажется, что этого недостаточно для работы CVAE: одномерный временной ряд, сгенерированный из заданных координат в набор данных тестирования просто ужасен и не соответствует 1D временному ряду в наборе тестирования.

Это моя реализация:

epsilon_std = 1.0

# dimension of input 
n_x = X_train.shape[1] # 2001 time components for each time series
n_y = y_train.shape[1] # 3 coordinates x,y,z

# 2700 samples for input, 300 for output    
X_train = np.reshape(X_train, [2700, n_x, 1])
X_test = np.reshape(X_test, [300, n_x, 1])
y_train = np.reshape(y_train, [2700, n_y, 1])
y_test = np.reshape(y_test, [300, n_y, 1])

# dimension of latent space (batch size by latent dim)
batch_size = 100
latent_dim = 700

### network parameters
Conv_1 = 8
Conv_2 = 16
Dense_1 = 700
Dense_2 = 400
# Convolution Kernel 
Kernel_1 = 16
Kernel_2 = 64

x = Input(shape=(n_x,1))
cond = Input(shape=(n_y, 1))
inputs = concatenate([x, cond], axis=1)
conv_1 = Conv1D(Conv_1, kernel_size=Kernel_1,
                padding='valid', strides=Stride_1, activation='tanh')(inputs)
maxp1 = MaxPooling1D(2)(conv_1)
conv_2 = Conv1D(Conv_2, kernel_size=Kernel_2,
                padding='valid', strides=Stride_2, activation='tanh')(maxp1)
maxp2 = MaxPooling1D(2)(conv_2)
flatten = Flatten()(maxp2)
hidden = Dense(Dense_1, activation='tanh')(flatten)

z_mean = Dense(latent_dim)(hidden)
z_log_var = Dense(latent_dim)(hidden)

def sampling(args):
    z_mean, z_log_var = args
    epsilon = K.random_normal(shape=(batch_size, latent_dim), 
                              mean=0., stddev=epsilon_std)
    return(z_mean + K.exp(z_log_var/2) * epsilon)

z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])
z = Reshape([latent_dim, 1])(z)
z_cond = concatenate([z, cond], axis=1)


de_conv_1 = Conv1D(Conv_2, kernel_size=Kernel_2, 
                   padding='valid', activation='tanh')
upsamp1 = UpSampling1D(2)
de_conv_2 = Conv1D(Conv_1, kernel_size=Kernel_1,
                   padding='valid', activation='tanh')
upsamp2 = UpSampling1D(2)
flatten = Flatten()
x_decoded_mean0 = Dense(n_x)
x_decoded_mean = Reshape([n_x, 1])


# decoder (as part of the CVAE)
h_p = de_conv_1(z_cond)
h_p = upsamp1(h_p)
h_p = de_conv_2(h_p)
h_p = upsamp2(h_p)
h_p = flatten(h_p)
h_p = x_decoded_mean0(h_p)
out = x_decoded_mean(h_p)

cvae = Model([x, cond], out)
cvae.summary()

# decoder (for re-user after training, for 1D time series generation)
decoder_input = Input(shape=(latent_dim+n_y, 1))
d_h = de_conv_1(decoder_input)
d_h = upsamp1(d_h)
d_h = de_conv_2(d_h)
d_h = upsamp2(d_h)
d_h = flatten(d_h)
d_h = x_decoded_mean0(d_h)
x_pred = x_decoded_mean(d_h)

decoder = Model(decoder_input, x_pred)
decoder.summary()

epochs = 60

# The number of epochs at which KL loss should be included
klstart = 50
# number of epochs over which KL scaling is increased from 0 to 1
kl_annealtime = 20
class AnnealingCallback(Callback):
    def __init__(self, weight):
        self.weight = weight
    def on_epoch_end (self, epoch, logs={}):
        if epoch < klstart :
            new_weight = 0.1
            K.set_value(self.weight, new_weight)
        if epoch > klstart :
            new_weight = min(K.get_value(self.weight) + (1./ kl_annealtime), 1.)
            K.set_value(self.weight, new_weight)
        print ("Current KL Weight is " + str(K.get_value(self.weight)))
# the starting value of weight is 0
weight = K.variable(0.)
# define it as a keras backend variable
# wrap the loss as a function of weight
def vae_loss(weight):
    def loss (y_true, y_pred):
        # mse loss
        reconstruction_loss = K.mean(K.square(y_true - y_pred), axis=[1, 2])
        # kl loss
        kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
        kl_loss = K.mean(kl_loss, axis=-1)
        kl_loss *= -0.5 #* 0.001
        return K.mean(reconstruction_loss + (weight * kl_loss))
    return loss
def KL_loss(y_true, y_pred):
    return(0.5* K.mean(K.exp(z_log_var)/(epsilon_std**2) + K.square(z_mean)/(epsilon_std**2) - 1. - z_log_var + np.log(epsilon_std**2), axis=-1))
def recon_loss(y_true, y_pred):
    return K.mean(K.square(y_true - y_pred), axis=[1, 2])

## compile vae with the weighted vae loss
vae.compile(optimizer='adam', loss=vae_loss(weight), metrics=[KL_loss, recon_loss])
## train VAE with annealing callback
vae_hist = vae.fit([X_train, y_train], X_train, epochs=epochs, batch_size=batch_size, validation_data = ([X_test, y_test], X_test), callbacks=[AnnealingCallback(weight)])
...