CycleGAN - артефакты и стиль не передаются - PullRequest
0 голосов
/ 26 января 2019

Я использую tf и Keras для создания цикла GAN, следуя подходу, используемому здесь и здесь

У меня странное поведение на изображениях, генерируемых генераторами A-> B и B-> A


На следующем рисунке слева направо

  • real_A (исходное изображение)
  • Генерируемый_B (генератор_AtoB применяется к real_A)
  • generate_A (generator_BtoA, примененный к предыдущему изображению)

и его аналог

  • real_B (исходное изображение)
  • Генерируемый_A (генератор_BtoA применяется к real_B)
  • generate_B (generator_AtoB, примененный к предыдущему изображению)

GAN example

Изображения 2 и 5 являются приложениями генераторов к исходным изображениям, и они имеют очень сильные клетчатые артефакты (я полагаю, из-за деконволюции) и не показывают никаких признаков «трансформации» от лошади к зебре и наоборот.

Что я не понимаю, так это то, что изображения 3 и 6 - это одни и те же генераторы, применяемые к «искаженному» изображению, но они не показывают никаких признаков артефактов.

  • Я делаю что-то ужасно неправильно при обучении генераторов?

Даже после 10 тыс. Эпох нет видимых улучшений: - почему на изображениях 2 и 5 отсутствуют признаки передачи стиля? - почему на изображениях 2 и 5 показаны очень сильные артефакты, а не 3 и 6?


Полный код:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

# https://hardikbansal.github.io/CycleGANBlog/
import sys
import time
import pickle
import tensorflow as tf
import numpy as np
import keras
from keras.models import Sequential, Model
from keras.layers import Dense, Flatten, Input, Dropout
from keras.layers import multiply, add as kadd
from keras.layers import Conv2D, BatchNormalization, Conv2DTranspose
from keras.layers import LeakyReLU, ReLU
from keras.layers import Activation

from keras.preprocessing.image import ImageDataGenerator

from PIL import Image


from custom_layers import ReflectionPadding2D


# NET PARAMETERS
ngf = 32 # Number of filters in first layer of generator
ndf = 64 # Number of filters in first layer of discriminator
BATCH_SIZE = 1 # batch_size
pool_size = 50 # pool_size
IMG_WIDTH = 256 # Imput image will of width 256
IMG_HEIGHT = 256 # Input image will be of height 256
IMG_DEPTH = 3 # RGB format
INPUT_SHAPE = (IMG_WIDTH, IMG_HEIGHT, IMG_DEPTH)

USE_IDENTITY_LOSS = False


# TRAINING PARAMETERS
ITERATIONS = 1000000
DISCRIMINATOR_ITERATIONS = 1
SAVE_IMAGES_INTERVAL = 100

SAVE_MODEL_INTERVAL = 1000

FAKE_POOL_SIZE=50

# DATASET="vangogh2photo"
DATASET="horse2zebra"



def resnet_block(num_features):

    block = Sequential()
    block.add(Conv2D(num_features, kernel_size=3, strides=1, padding="SAME"))
    block.add(BatchNormalization())
    block.add(ReLU())
    block.add(Conv2D(num_features, kernel_size=3, strides=1, padding="SAME"))
    block.add(BatchNormalization())
    block.add(ReLU())


    # resblock_input = Input(shape=(64, 64, 256))
    resblock_input = Input(shape=(128, 128, 256))
    conv_model = block(resblock_input)

    _sum = kadd([resblock_input, conv_model])

    composed =  Model(inputs=[resblock_input], outputs=_sum)
    return composed


def discriminator( f=4, name=None):



    d = Sequential()
    d.add(Conv2D(ndf, kernel_size=f, strides=2, padding="SAME", name="discr_"+name+"_conv2d_1"))
    d.add(BatchNormalization())
    d.add(LeakyReLU(0.2))
    d.add(Dropout(0.1))
    d.add(Conv2D(ndf * 2, kernel_size=f, strides=2, padding="SAME", name="discr_"+name+"_conv2d_2"))
    d.add(BatchNormalization())
    d.add(LeakyReLU(0.2))
    d.add(Dropout(0.1))
    d.add(Conv2D(ndf * 4, kernel_size=f, strides=2, padding="SAME", name="discr_"+name+"_conv2d_3"))
    d.add(BatchNormalization())
    d.add(LeakyReLU(0.2))
    d.add(Dropout(0.1))
    d.add(Conv2D(ndf * 8, kernel_size=f, strides=2, padding="SAME", name="discr_"+name+"_conv2d_4"))
    d.add(BatchNormalization())
    d.add(LeakyReLU(0.2))
    d.add(Dropout(0.1))
    d.add(Conv2D(1, kernel_size=f, strides=1, padding="SAME", name="discr_"+name+"_conv2d_out"))

    # d.add(Activation("sigmoid"))


    model_input = Input(shape=INPUT_SHAPE)

    decision  = d(model_input)

    composed = Model(model_input, decision)
    # print(d.output_shape)
    # d.summary()

    return composed

def generator(name=None):

    g = Sequential()
    # ENCODER

    g.add(Conv2D(ngf, kernel_size=7,
            strides=1,
            # activation='relu',
            padding='SAME',
            kernel_initializer='random_normal',
            bias_initializer='zeros',
            input_shape=INPUT_SHAPE,
            name="encoder_"+name+"_0" ))
    # g.add(BatchNormalization())
    # g.add(ReLU())

    # g.add(ReflectionPadding2D())

    g.add(Conv2D(64*2, kernel_size=3,
            strides=2,
            padding='SAME',
            kernel_initializer='random_normal',
            bias_initializer='zeros',
            name="encoder_"+name+"_1" ))
    # g.add(BatchNormalization())
    # g.add(ReLU())
    # output shape = (128, 128, 128)

    # g.add(ReflectionPadding2D())

    g.add(Conv2D(64*4, kernel_size=3,
            strides=2,
            padding="SAME",
            kernel_initializer='random_normal',
            bias_initializer='zeros',
            name="encoder_"+name+"_2",
            ))
    # # g.add(BatchNormalization())
    # # g.add(ReLU())
    # # output shape = (64, 64, 256)

    # # END ENCODER



    # # TRANSFORM

    g.add(resnet_block(64*4))
    g.add(resnet_block(64*4))
    g.add(resnet_block(64*4))
    g.add(resnet_block(64*4))
    g.add(resnet_block(64*4))
    g.add(resnet_block(64*4))



    # # END TRANSFORM
    # # generator.shape = (64, 64, 256)

    # # DECODER

    g.add(Conv2DTranspose(ngf*2,kernel_size=3, strides=2, padding="SAME"))
    # g.add(BatchNormalization())
    # g.add(ReLU())

    g.add(Conv2DTranspose(ngf*2,kernel_size=3, strides=2, padding="SAME"))
    # # g.add(BatchNormalization())
    # # g.add(ReLU())

    g.add(Conv2D(3,kernel_size=7, strides=1, padding="SAME", name="generator_out_layer"))
    g.add(ReLU())
    g.summary()
    # exit()


    # END DECODER

    model_input = Input(shape=INPUT_SHAPE)
    generated_image = g(model_input)

    composed = Model(model_input, generated_image, name=name)
    return composed


def fromMinusOneToOne(x):
    return x/127.5 -1

def toRGB(x):
    return (1+x) * 127.5


def createImageGenerator( subset="train", data_type="A", batch_size=1, pp=None):

    # we create two instances with the same arguments
    data_gen_args = dict(
                         # rescale = 1./127.5,
                         # rotation_range=5.,
                         preprocessing_function= pp,
                         # width_shift_range=0.1,
                         # height_shift_range=0.1,
                         # zoom_range=0.1
                         )

    image_datagen = ImageDataGenerator(**data_gen_args)

    # Provide the same seed and keyword arguments to the fit and flow methods
    seed = 1

    image_directory=subset+data_type
    print('data/'+DATASET+'/'+image_directory)
    image_generator = image_datagen.flow_from_directory(
        'data/'+DATASET+'/'+image_directory,
        class_mode=None,
        batch_size=batch_size,
        seed=seed)

    return image_generator

def fit(
    generator_trainer,
    disc_trainer,
    generator_AtoB,
    generator_BtoA
    ):

    fake_A_pool = []
    fake_B_pool = []


    ones = np.ones((BATCH_SIZE,)+ generator_trainer.output_shape[0][1:])
    zeros = np.zeros((BATCH_SIZE,)+ generator_trainer.output_shape[0][1:])

    zeros = np.sum([zeros, 0.07])

    train_A_image_generator = createImageGenerator("train", "A")
    # print(train_A_image_generator.next())
    # for c in train_A_image_generator:
    #     print(c)
    #     exit()
    # exit()

    train_B_image_generator = createImageGenerator("train", "B")
    # test_A_image_generator = createImageGenerator("test", "A")
    # test_B_image_generator = createImageGenerator("test", "B")

    now = time.strftime("%Y-%m-%d_%H.%M.%S")
    it = 1
    while it  <= ITERATIONS:
        fw = tf.summary.FileWriter(logdir="./tensorboard/"+now)
        start = time.time()
        print("\nIteration %d " % it)
        sys.stdout.flush()

        # THIS ONLY WORKS IF BATCH SIZE == 1
        real_A = train_A_image_generator.next()
        real_B = train_B_image_generator.next()

        fake_A_pool.extend(generator_BtoA.predict(real_B))
        fake_B_pool.extend(generator_AtoB.predict(real_A))

        #resize pool
        fake_A_pool = fake_A_pool[-FAKE_POOL_SIZE:]
        fake_B_pool = fake_B_pool[-FAKE_POOL_SIZE:]

        fake_A = [ fake_A_pool[ind] for ind in np.random.choice(len(fake_A_pool), size=(BATCH_SIZE,), replace=False) ]
        fake_B = [ fake_B_pool[ind] for ind in np.random.choice(len(fake_B_pool), size=(BATCH_SIZE,), replace=False) ]

        fake_A = np.array(fake_A)
        fake_B = np.array(fake_B)





        for x in range(0, DISCRIMINATOR_ITERATIONS):
            _, D_loss_real_A, D_loss_fake_A, D_loss_real_B, D_loss_fake_B = \
            disc_trainer.train_on_batch(
                [real_A, fake_A, real_B, fake_B],
                [zeros, ones * 0.9, zeros, ones * 0.9] )
                # [zeros, ones, zeros, ones] )


        print("=====")
        print("Discriminator loss:")
        print("Real A: %s, Fake A: %s || Real B: %s, Fake B: %s " % ( D_loss_real_A, D_loss_fake_A, D_loss_real_B, D_loss_fake_B))

        if USE_IDENTITY_LOSS:
            _, G_loss_fake_B, G_loss_fake_A, G_loss_rec_A, G_loss_rec_B, G_loss_id_A, G_loss_id_B = \
                generator_trainer.train_on_batch(
                    [real_A, real_B],
                    [zeros, zeros, real_A, real_B, real_A, real_B])
        else:
            _, G_loss_fake_B, G_loss_fake_A, G_loss_rec_A, G_loss_rec_B = \
                generator_trainer.train_on_batch(
                    [real_A, real_B],
                    [zeros, zeros, real_A, real_B])

                # generator_trainer outputs:
                # [discriminator_generated_B,   discriminator_generated_A,cyc_A,      cyc_B,]




        print("=====")
        print("Generator loss:")

        if USE_IDENTITY_LOSS:
            print("Fake B: %s, Cyclic A: %s || Fake A: %s, Cyclic B: %s || ID A: %s, ID B: %s" % (G_loss_fake_B, G_loss_rec_A, G_loss_fake_A, G_loss_rec_B, G_loss_id_A, G_loss_id_B))
        else:
            print("Fake B: %s, Cyclic A: %s || Fake A: %s, Cyclic B: %s " % (G_loss_fake_B, G_loss_rec_A, G_loss_fake_A, G_loss_rec_B))

        end = time.time()
        print("Iteration time: %s s" % (end-start))
        sys.stdout.flush()

        summary = tf.Summary(value=[
            tf.Summary.Value(tag="disc_A_loss_on_real", simple_value = D_loss_real_A),
            tf.Summary.Value(tag="disc_A_loss_on_generated", simple_value = D_loss_fake_A),
            tf.Summary.Value(tag="disc_B_loss_on_real", simple_value = D_loss_real_B),
            tf.Summary.Value(tag="disc_B_loss_on_generated", simple_value = D_loss_fake_B),

            tf.Summary.Value(tag="gen_generated_A", simple_value = G_loss_fake_A),
            tf.Summary.Value(tag="gen_generated_B", simple_value = G_loss_fake_B),
            tf.Summary.Value(tag="gen_cyc_A", simple_value = G_loss_rec_A),
            tf.Summary.Value(tag="gen_cyc_B", simple_value = G_loss_rec_B),
        ])

        fw.add_summary(summary, global_step=it)
        fw.flush()
        fw.close()

        if not (it % SAVE_IMAGES_INTERVAL ):
            imgA = real_A
            # print(imgA.shape)
            imga2b = generator_AtoB.predict(imgA)
            # print(imga2b.shape)
            imga2b2a = generator_BtoA.predict(imga2b)
            # print(imga2b2a.shape)
            imgB = real_B
            imgb2a = generator_BtoA.predict(imgB)
            imgb2a2b = generator_AtoB.predict(imgb2a)

            c = np.concatenate([imgA, imga2b, imga2b2a, imgB, imgb2a, imgb2a2b], axis=2).astype(np.uint8)
            # print(c.shape)
            x = Image.fromarray(c[0])
            x.save("data/generated/iteration_%s.jpg" % str(it).zfill(4))



        # with open("models/generator_AtoB.pickle", "wb") as saveFile:
        #     pickle.dump(generator_AtoB, saveFile)

        # with open("models/generator_BtoA.pickle", "wb") as saveFile:
        #     pickle.dump(generator_BtoA, saveFile)

        if not (it % SAVE_MODEL_INTERVAL):
            generator_AtoB.save("models/generator_AtoB_id.h5")
            generator_BtoA.save("models/generator_BtoA_id.h5")

        it+=1



    generator_AtoB.save("models/generator_AtoB_id.h5")
    generator_BtoA.save("models/generator_BtoA_id.h5")


    return

if __name__ == '__main__':

    generator_AtoB = generator(name="gen_A")
    generator_BtoA = generator(name="gen_B")

    discriminator_A = discriminator(name="disc_A")
    discriminator_B = discriminator(name="disc_B")





    ### GENERATOR TRAINING
    optim = keras.optimizers.Adam(lr=0.0002, beta_1=0.5, beta_2=0.999, epsilon=1e-08)

    input_A = Input(batch_shape=(None, IMG_WIDTH, IMG_HEIGHT, IMG_DEPTH), name="input_A")
    generated_B = generator_AtoB(input_A)
    discriminator_generated_B = discriminator_B(generated_B)
    cyc_A = generator_BtoA(generated_B)


    input_B = Input(batch_shape=(None, IMG_WIDTH, IMG_HEIGHT, IMG_DEPTH), name="input_B")
    generated_A = generator_BtoA(input_B)
    discriminator_generated_A = discriminator_A(generated_A )
    cyc_B = generator_AtoB(generated_A)


    # cyclic error is increased, because it's more important
    cyclic_weight_multipier = 10

    if USE_IDENTITY_LOSS:
        generator_trainer =  Model([input_A, input_B],
                         [discriminator_generated_B,   discriminator_generated_A,
                         cyc_A,      cyc_B,
                         generated_B,     generated_A ]
                         )
        losses =         [ "MSE", "MSE", "MAE",                   "MAE",                    "MAE", "MAE"]
        losses_weights = [ 1,     1,     cyclic_weight_multipier, cyclic_weight_multipier,  1,     1    ]
    else:
        generator_trainer =  Model([input_A, input_B],
                     [discriminator_generated_B,   discriminator_generated_A,
                     cyc_A,      cyc_B,])

        losses =         [ "MSE", "MSE", "MAE",                   "MAE"]
        losses_weights = [ 1,     1,     cyclic_weight_multipier, cyclic_weight_multipier]

    generator_trainer.compile(optimizer=optim, loss = losses, loss_weights=losses_weights)



    ### DISCRIMINATOR TRAINING

    disc_optim = keras.optimizers.Adam(lr=0.0002, beta_1=0.5, beta_2=0.999, epsilon=1e-08)

    real_A = Input(batch_shape=(None, IMG_WIDTH, IMG_HEIGHT, IMG_DEPTH), name="in_real_A")
    real_B = Input(batch_shape=(None, IMG_WIDTH, IMG_HEIGHT, IMG_DEPTH), name="in_real_B")

    generated_A = Input(batch_shape=(None, IMG_WIDTH, IMG_HEIGHT, IMG_DEPTH), name="in_gen_A")
    generated_B = Input(batch_shape=(None, IMG_WIDTH, IMG_HEIGHT, IMG_DEPTH), name="in_gen_B")

    discriminator_real_A = discriminator_A(real_A)
    discriminator_generated_A = discriminator_A(generated_A)
    discriminator_real_B =  discriminator_B(real_B)
    discriminator_generated_B = discriminator_B(generated_B)

    disc_trainer = Model([real_A, generated_A, real_B, generated_B],
                         [  discriminator_real_A,
                            discriminator_generated_A,
                            discriminator_real_B,
                            discriminator_generated_B] )


    disc_trainer.compile(optimizer=disc_optim, loss = 'MSE')


    #########
    ##
    ## TRAINING
    ##
    #########




    fit(generator_trainer,
        disc_trainer,
        generator_AtoB,
        generator_BtoA)
...