Производительность конвейера скользящего окна Tensorflow.dataset - PullRequest
0 голосов
/ 23 мая 2019

Я реализую входной конвейер tf.data для задачи классификации временных рядов;сам набор данных представляет собой> 2M упорядоченную по времени запись из 50 объектов;в течение любого заданного времени t модель прогнозирования принимает окно элементов размера t-k, t-k+1, ..., t, чтобы сделать прогноз одного класса для t+1.Обучение проходит более 256 полных наборов данных.

Для наборов с <500 000 num.записи и размер окна 128, очевидное решение состоит в том, чтобы подготовить избыточную встроенную матрицу размером 500000x128x50 и выполнить случайную выборку по первому затемнению.за каждую партию;это не помогает с реальными размерами данных, хотя.</p>

Я вычислил этот конвейер процесса:

import tensorflow as tf
import numpy as np
import time

tf.reset_default_graph()

time_embed = 128
batch_size = 512
num_features = 50
data_size = 100000

shuffle_buffer_size = batch_size * 100

# Input placeholders (ingest numpy arrays):
input_data = tf.placeholder(tf.float32, shape=[None, num_features])
labels = tf.placeholder(tf.float32, shape=[None,])

input_tensors = {'x': input_data, 'y': labels}

# Dictionary of datasets:
ds_struct = {key: tf.data.Dataset.from_tensor_slices(tz) for key, tz in input_tensors.items()}


# Make time embedding windows:
ds_windowed = {
    key: ds.window(time_embed, 1, 1, drop_remainder=True).flat_map(lambda x: x.batch(time_embed))
    for key, ds in ds_struct.items()
}

# Zip into single dataset:
ds = tf.data.Dataset.zip(ds_windowed)

# Note that shapes of tensors are set to be dynamic except last dimension:
print('structured embedded dataset:\n', ds)

ds = ds.shuffle(shuffle_buffer_size)
ds = ds.batch(batch_size)

# Make reinitialisable iterator to feed different data (train, eval, test) into input_tensors:
iterator = ds.make_initializable_iterator()

batch = iterator.get_next()
print('batch:\n', batch)

# For many-to-one prediction discard all but last embedded targets,
# We also need to explicitly reshape x to get static shape sufficient to pass to some estimator:
truncated_batch = {
    'y': batch['y'][:, -1],
    'x': tf.reshape(batch['x'],[-1, time_embed, batch['x'].shape[-1]])
}

print('final batch:\n', truncated_batch)

Один этап запускается на небольшом наборе записей по 100 КБ:


with tf.Session() as sess:
    train_x = np.ones([data_size, num_features])
    train_y = np.ones([data_size])

    # Initialise dataset with data:
    sess.run(iterator.initializer, feed_dict={input_data: train_x, labels: train_y})

    start = time.time()

    while True:
        try:
            o = sess.run(batch)

        except tf.errors.OutOfRangeError as e:
            break
    print('elapsed time for tf.data: ', time.time() - start)


Тестовый прогон показывает, что он занимает около 50второй на моем рабочем столе - итерация за одну эпоху, которая примерно в 6 раз медленнее, чем итерация по предварительно внедренному массиву;

Я буду благодарен за комментарии и предложения по улучшению этой трубы;Может быть, весь мой подход к построению графиков с окном некорректен и есть эффективные альтернативы для получения одинаковых результатов?

Обновление: изменение части кода следующим образом приводит к увеличению скорости примерно на 20% (на самом деле не могу понять, почему):


ds = tf.data.Dataset.zip({key: tf.data.Dataset.from_tensor_slices(tz) for key, tz in input_tensors.items()})

def window_fn(ds):
    ds =  ds.window(time_embed, 1, 1, drop_remainder=True).\
        flat_map(lambda x: tf.data.Dataset.zip({k: v.batch(time_embed) for k, v in x.items()}))
    return ds

ds = ds.apply(window_fn)
...