Пользовательский цикл обучения TF 2.0 работает значительно хуже, чем keras fit_generator - не могу понять, почему Задать вопрос - PullRequest
0 голосов
/ 09 мая 2019

Пытаясь лучше понять тензорный поток 2.0, я пытаюсь написать собственный цикл обучения, чтобы повторить работу функции keras fit_generator. В моей голове я повторил шаги, предпринимаемые fit_generator для обучения моей сети, но это явно не тот случай, поскольку сеть тренируется значительно лучше, используя fit_generator, в отличие от моего цикла обучения (мой цикл: MAE ~ 3.0, keras fit_generator: MAE ~ 2.0).

Я пробовал тренировать обе версии на одном образце, и обе способны к переоснащению. Я пробовал несколько разных версий оптимизаторов и метрик потерь, но расхождение сохраняется. В то время как я использую собственный генератор, я использую один и тот же собственный генератор для каждого случая, и результаты, как описано выше, поэтому я не верю, что это вызывает проблему (хотя это возможно). Набор данных, который я использую, является набором данных с пониженной дискретизацией от https://www.kaggle.com/c/LANL-Earthquake-Prediction/data,, но, вероятно, его можно воспроизвести и с использованием других данных.

import tensorflow as tf
import numpy as np
import os
import pandas as pd
import time
from sklearn import preprocessing
import shutil
import my_classes_tf

# Import data and massage
os.chdir('/home/aj/Data/LANL-Earthquake-Prediction')
# cv_indices = pd.read_csv('./Current Data/cv_assignments.csv', delimiter=',', header=None).values.astype('int16')
evaluation_indices = pd.read_csv('./Current Data/Validation Indices Original.csv', delimiter=',', header=None).values.astype('int64')
eval_index, cv_index = np.hsplit(evaluation_indices, 2)
train = pd.read_csv('./Current Data/NewFeatures.csv', delimiter=',', header=None).values.astype('float32')
train_data, other_info = np.hsplit(train, 2)
targets, OG_row, EQ_ind, CV_ind = np.hsplit(other_info, 4)
targets = targets.astype('float16')
OG_row = OG_row.astype('int64')
EQ_ind = EQ_ind.astype('int64')
CV_ind = CV_ind.astype('int64')
mod_eval = pd.read_csv('./Current Data/Validation Indices Modified.csv', delimiter=',', header=None).values.astype('int64')
mod_eval_index, mod_cv_index, _, _ = np.hsplit(mod_eval, 4)

logtrain = pd.read_csv('./Current Data/NewFeatures_logtransformed.csv', delimiter=',', header=None).values.astype('float32')

log_std, log_skew, log_kurt, log_sixth, _, _, _ = np.hsplit(logtrain, 7)
train_data_logs = np.concatenate((log_std, log_skew, log_kurt, log_sixth), axis=1)

del logtrain, log_std, log_skew, log_kurt, log_sixth, other_info


def safe_mkdir(path):
    """ Create a directory if there isn't one already. """
    try:
        os.mkdir(path)
    except OSError:
        pass


def del_dir(name):
    if os.path.isdir('./Saved Models/{}'.format(name)):
        shutil.rmtree('./Saved Models/{}'.format(name))
    if os.path.isdir('./Error Plots/{}'.format(name)):
        shutil.rmtree('./Error Plots/{}'.format(name))
    if os.path.isdir('./Train and Test Losses/{}'.format(name)):
        shutil.rmtree('./Train and Test Losses/{}'.format(name))


fold = 1
boolz = CV_ind != fold
cv_train = train_data_logs[boolz.reshape(-1)]
cv_targets = targets[boolz.reshape(-1)]
cv_eqs = EQ_ind[boolz.reshape(-1)]

scaler = preprocessing.StandardScaler().fit(cv_train)
cv_train = scaler.transform(cv_train)
cv_val = scaler.transform(train_data_logs)

batch_size = 64
lookback = 14995
offset = 15000

if np.max(mod_eval_index) > len(train_data_logs):  # Prevents from dividing twice on accident when re-running code
    mod_eval_index = mod_eval_index // 10
train_gen = my_classes_tf.DataGenerator(data=cv_train,
                                        targets=cv_targets,
                                        indices=cv_eqs,
                                        min_index=0,
                                        max_index=None,
                                        batch_size=batch_size,
                                        lookback=lookback,
                                        offset=offset,
                                        shuffle_start=True,
                                        shuffle_feed=True)

val_gen = my_classes_tf.ValDataGenerator(data=cv_val,
                                         targets=targets,
                                         eval_index=mod_eval_index,
                                         cv_index=mod_cv_index,
                                         cv=fold,
                                         batch_size=batch_size,
                                         lookback=lookback)


class CRNN(tf.keras.Model):
    def __init__(self):
        super(CRNN, self).__init__()
        # Consider LocallyConnected1D
        self.conv1 = tf.keras.layers.Conv1D(filters=32, kernel_size=50, strides=1, padding='same',
                                            activation=None, kernel_initializer='he_uniform', name='conv1a')
        self.pool1 = tf.keras.layers.MaxPool1D(pool_size=100, strides=None, name='pool1')
        self.gru1 = tf.keras.layers.GRU(units=32, name='gru1')
        self.dense1 = tf.keras.layers.Dense(units=16, activation=None, name='dense1')
        self.output1 = tf.keras.layers.Dense(units=1, activation='relu', name='output1')
        self.lrelu = tf.keras.layers.LeakyReLU(alpha=0.1)
        self.mae = tf.keras.losses.MeanAbsoluteError()
        self.optimizer = tf.keras.optimizers.SGD(lr=1e-3, momentum=0, nesterov=True)

    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.lrelu(x)
        x = self.pool1(x)
        x = self.gru1(x)
        x = self.dense1(x)
        x = self.lrelu(x)
        return self.output1(x)

    def train_step(self, sample, label):
        with tf.GradientTape() as tape:
            predictions = self.call(sample)
            loss = self.mae(label, predictions)
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        self.train_loss(loss)

    def eval_once(self, sample, label):
        predictions = self.call(sample)
        loss = self.mae(label, predictions)
        self.eval_loss(loss)

    def train(self, num_epochs):
        self.train_loss = tf.keras.metrics.Mean(name='train_loss')
        self.eval_loss = tf.keras.metrics.Mean(name='eval_loss')
        self.store_gradients = np.empty((num_epochs, ))
        for epoch in range(num_epochs):
            start_time = time.time()
            self.train_loss.reset_states()
            self.eval_loss.reset_states()
            for samples, labels in train_gen:
                self.train_step(samples, labels)
            train_gen.on_epoch_end()
            for samples, labels in val_gen:
                self.eval_once(samples, labels)
            print('Epoch: {0}, Time: {1:.2f}, Train Loss: {2:.2f}, Test Loss: {3:.2f}'.format(epoch + 1,
                                                                                              time.time() - start_time,
                                                                                              self.train_loss.result(),
                                                                                              self.eval_loss.result()))


tf.keras.backend.clear_session()
model = CRNN()
model.train(20)

model2 = CRNN()
model2.compile(optimizer=tf.keras.optimizers.SGD(lr=1e-3, momentum=0, nesterov=True),
               loss='mae')

history = model2.fit_generator(generator=train_gen,
                               validation_data=val_gen,
                               epochs=20,
                               workers=1,
                               use_multiprocessing=False,
                               verbose=2,
                               callbacks=[])

# https://github.com/tensorflow/tensorflow/blob/r2.0/tensorflow/python/keras/engine/training_eager.py
# Check this ^ to see what is different between keras fit_generator and your fit
model3 = CRNN()
model3.compile(optimizer=model3.optimizer,
               loss=model3.mae)
history3 = model3.fit_generator(generator=train_gen,
                                validation_data=val_gen,
                                epochs=20,
                                workers=1,
                                use_multiprocessing=False,
                                verbose=2,
                                callbacks=[])

my_classes_tf.py:


import tensorflow as tf
import numpy as np
import random


class ValDataGenerator(tf.keras.utils.Sequence):
    """Generates data"""
    def __init__(self, data, targets, eval_index, cv_index, cv, batch_size, lookback):
        self.data = data
        self.data_width = self.data.shape[1]
        self.targets = targets
        self.eval_index = eval_index
        self.cv_index = cv_index
        self.cv = cv
        self.batch_size = batch_size
        self.lookback = lookback
        self.row_master = self.eval_index[self.cv_index == self.cv]

    def __len__(self):
        """Denotes number of batches per epoch. Cuts off after max_index is reached."""
        return len(self.eval_index[self.cv_index == self.cv])//self.batch_size + 1

    def __getitem__(self, index):
        """
        Returns a batch
        rows marks the ending index of each sample within data for a batch
        """
        rows = self.row_master[index * self.batch_size:(index + 1) * self.batch_size]
        samples, label = self.__data_generation(rows)
        return samples, label

    def __data_generation(self, rows):
        """Generates one batch of data samples and targets"""
        samples = np.empty((len(rows), self.lookback, self.data_width)).astype('float32')
        label = np.empty(len(rows)).astype('float32')
        for j in range(len(rows)):
            samples[j, ] = self.data[(rows[j] - self.lookback):rows[j]]
            label[j] = self.targets[rows[j]]
        return samples, label


class DataGenerator(tf.keras.utils.Sequence):
    """Generates data"""
    def __init__(self, data, targets, indices, batch_size, min_index=0, max_index=None,
                 lookback=149950, offset=150000, shuffle_start=True, shuffle_feed=True):
        if max_index is None:
            self.max_index = len(data)
        else:
            self.max_index = max_index
        self.data = data[min_index:self.max_index].astype('float32')
        self.data_width = self.data.shape[1]
        self.targets = targets[min_index:self.max_index].astype('float32')
        self.indices = indices[min_index:self.max_index]
        self.batch_size = batch_size
        self.lookback = lookback
        self.offset = offset
        self.shuffle_start = shuffle_start
        self.shuffle_feed = shuffle_feed
        self.epoch_start = self.lookback+5
        self.pre_len = (self.max_index - min_index + self.offset - self.lookback) // (self.batch_size * self.offset)
        self.row_master = list(range(self.epoch_start, self.epoch_start + self.pre_len * self.batch_size * self.offset, self.offset))  # indices in data of all samples
        self.on_epoch_end()

    def __len__(self):
        """Denotes number of batches per epoch. Cuts off after max_index is reached."""
        return len(self.row_master) // self.batch_size + 1

    def __getitem__(self, index):
        """
        Returns a batch
        rows marks the ending index of each sample within data for a batch
        """
        rows = self.row_master[index * self.batch_size:(index + 1) * self.batch_size]
        samples, labels = self.__data_generation(rows)
        return samples, labels

    def on_epoch_end(self):
        """If shuffle is true, then we start from a new initial index"""
        self.epoch_start = self.lookback+5
        if self.shuffle_start:
            self.epoch_start += random.randint(0, self.offset)
            self.row_master = list(range(self.epoch_start, self.epoch_start + self.pre_len * self.batch_size * self.offset, self.offset))
        # if self.perform_os is not None:
        #     self.over_sample()
        self.adjust_cross_eqs()
        if self.shuffle_feed:
            np.random.shuffle(self.row_master)

    def adjust_cross_eqs(self):
        """Deletes samples that have an earthquake occur during them to occur later, so that an EQ does not occur within it."""
        del_list = []
        for i, row in enumerate(self.row_master):
            if self.indices[row] != self.indices[row - self.lookback + 1]:
                del_list.append(i)
        self.row_master = np.delete(self.row_master, del_list)

    def __data_generation(self, rows):
        """Generates one batch of data samples and targets"""
        samples = np.empty((len(rows), self.lookback, self.data_width)).astype('float32')
        labels = np.empty(len(rows)).astype('float32')
        for j in range(len(rows)):
            samples[j, ] = self.data[(rows[j] - self.lookback):rows[j]]
            labels[j] = self.targets[rows[j]]
        return samples, labels


class One_Sample_Only_DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, data, targets, lookback):
        self.data = data
        self.targets = targets
        self.lookback = lookback
        self.epoch_start = self.lookback+5
        self.data_width = self.data.shape[1]

    def __len__(self):
        return 1

    def __getitem__(self, index):
        samples = self.data[self.epoch_start - self.lookback: self.epoch_start].reshape(1, self.lookback, self.data_width)
        labels = self.targets[self.epoch_start]
        return samples, labels





Я бы ожидал, что потери от тренировок как в моем цикле тренировок, так и в keras fit_generator будут одинаковыми. Я что-то упускаю здесь очевидное или есть ошибка? Дайте мне знать, если я забыл опубликовать что-то важное!

...