Я новичок в GAN и нейронных сетях в целом.Я пытаюсь использовать генеративно-состязательные сети для прогнозирования временных рядов, и моя конечная цель также состоит в обнаружении аномалий во временных рядах.Я использую Python и Keras.
Вот код, который я использую для прогнозирования временных рядов, однако результат, который я получаю с помощью GAN, немного для меня непонятен, и я думаю, что он нуждается в некотором улучшении.
Данные временных рядов, которые я использую, являются популярными данными временных рядов Air-Passangers.
Спасибо за вашу помощь.
from __future__ import print_function, division
from keras.datasets import mnist
from keras.layers import Input, Dense, Reshape, Flatten, Dropout
from keras.layers import BatchNormalization, Activation, ZeroPadding2D
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from keras.optimizers import Adam
#import matplotlib.pyplot as plt
from matplotlib import pyplot as plt
import pandas as pd
import sys
import numpy as np
class GAN():
def __init__(self):
self.data_rows = 1
self.data_cols = 1
self.data_shape = (self.data_rows, self.data_cols)
self.latent_dim = 48
optimizer = Adam(0.0002, 0.5)
self.discriminator = self.build_discriminator()
self.discriminator.compile(loss='binary_crossentropy',
optimizer=optimizer,
metrics=['accuracy'])
self.generator = self.build_generator()
z = Input(shape=(self.latent_dim,))
data = self.generator(z)
self.discriminator.trainable = False
validity = self.discriminator(data)
self.combined = Model(z, validity)
self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)
def build_generator(self):
model = Sequential()
model.add(Dense(256, input_dim=self.latent_dim))
model.add(LeakyReLU(alpha=0.2))
model.add(BatchNormalization(momentum=0.8))
model.add(Dense(512))
model.add(LeakyReLU(alpha=0.2))
model.add(BatchNormalization(momentum=0.8))
model.add(Dense(1024))
model.add(LeakyReLU(alpha=0.2))
model.add(BatchNormalization(momentum=0.8))
model.add(Dense(np.prod(self.data_shape), activation='linear'))
model.add(Reshape(self.data_shape))
model.summary()
noise = Input(shape=(self.latent_dim,))
data = model(noise)
return Model(noise, data)
def build_discriminator(self):
model = Sequential()
model.add(Flatten(input_shape=self.data_shape))
model.add(Dense(512))
model.add(LeakyReLU(alpha=0.2))
model.add(Dense(256))
model.add(LeakyReLU(alpha=0.2))
model.add(Dense(1, activation='sigmoid'))
model.summary()
data = Input(shape=self.data_shape)
validity = model(data)
return Model(data, validity)
def train(self, epochs, batch_size=128, sample_interval=50):
df = pd.read_csv("AirPassengers.csv")
ts = df[["#Passengers"]]
X_train = ts.as_matrix()
# Rescale -1 to 1
#X_train = X_train / 127.5 - 1.
X_train = np.expand_dims(X_train, axis=3)
print("X_train")
print(X_train.shape)
#print(X_train)
valid = np.ones((batch_size, 1))
fake = np.zeros((batch_size, 1))
for epoch in range(epochs):
idx = np.random.randint(0, X_train.shape[0], batch_size)
data_s = X_train[idx]
noise = np.random.normal(0, 1, (batch_size, self.latent_dim))
gen_data = self.generator.predict(noise)
d_loss_real = self.discriminator.train_on_batch(data_s, valid)
d_loss_fake = self.discriminator.train_on_batch(gen_data, fake)
d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
# ---------------------
# Train Generator
# ---------------------
noise = np.random.normal(0, 1, (batch_size, self.latent_dim))
# Train the generator (to have the discriminator label samples as valid)
g_loss = self.combined.train_on_batch(noise, valid)
print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100*d_loss[1], g_loss))
if epoch % sample_interval == 0:
self.plot_gan_result(epoch, batch_size)
c = X_train.reshape(144, 1)
fig, axs = plt.subplots()
axs.plot(c, color = "blue", label = 'true')
def plot_gan_result(self, epoch, batch_size):
noise = np.random.normal(0, 1, (batch_size, self.latent_dim))
gen_data = self.generator.predict(noise)
b = gen_data.reshape(24, 1)
fig, axs = plt.subplots()
print("noise shape")
print(noise.shape)
print(noise[0])
axs.plot(b, color = "red", label = 'generated')
if __name__ == '__main__':
gan = GAN()
gan.train(epochs=30, batch_size=24, sample_interval=200)