Я реализую входной конвейер tf.data для задачи классификации временных рядов;сам набор данных представляет собой> 2M упорядоченную по времени запись из 50 объектов;в течение любого заданного времени t
модель прогнозирования принимает окно элементов размера t-k, t-k+1, ..., t
, чтобы сделать прогноз одного класса для t+1
.Обучение проходит более 256 полных наборов данных.
Для наборов с <500 000 num.записи и размер окна 128, очевидное решение состоит в том, чтобы подготовить избыточную встроенную матрицу размером 500000x128x50 и выполнить случайную выборку по первому затемнению.за каждую партию;это не помогает с реальными размерами данных, хотя.</p>
Я вычислил этот конвейер процесса:
import tensorflow as tf
import numpy as np
import time
tf.reset_default_graph()
time_embed = 128
batch_size = 512
num_features = 50
data_size = 100000
shuffle_buffer_size = batch_size * 100
# Input placeholders (ingest numpy arrays):
input_data = tf.placeholder(tf.float32, shape=[None, num_features])
labels = tf.placeholder(tf.float32, shape=[None,])
input_tensors = {'x': input_data, 'y': labels}
# Dictionary of datasets:
ds_struct = {key: tf.data.Dataset.from_tensor_slices(tz) for key, tz in input_tensors.items()}
# Make time embedding windows:
ds_windowed = {
key: ds.window(time_embed, 1, 1, drop_remainder=True).flat_map(lambda x: x.batch(time_embed))
for key, ds in ds_struct.items()
}
# Zip into single dataset:
ds = tf.data.Dataset.zip(ds_windowed)
# Note that shapes of tensors are set to be dynamic except last dimension:
print('structured embedded dataset:\n', ds)
ds = ds.shuffle(shuffle_buffer_size)
ds = ds.batch(batch_size)
# Make reinitialisable iterator to feed different data (train, eval, test) into input_tensors:
iterator = ds.make_initializable_iterator()
batch = iterator.get_next()
print('batch:\n', batch)
# For many-to-one prediction discard all but last embedded targets,
# We also need to explicitly reshape x to get static shape sufficient to pass to some estimator:
truncated_batch = {
'y': batch['y'][:, -1],
'x': tf.reshape(batch['x'],[-1, time_embed, batch['x'].shape[-1]])
}
print('final batch:\n', truncated_batch)
Один этап запускается на небольшом наборе записей по 100 КБ:
with tf.Session() as sess:
train_x = np.ones([data_size, num_features])
train_y = np.ones([data_size])
# Initialise dataset with data:
sess.run(iterator.initializer, feed_dict={input_data: train_x, labels: train_y})
start = time.time()
while True:
try:
o = sess.run(batch)
except tf.errors.OutOfRangeError as e:
break
print('elapsed time for tf.data: ', time.time() - start)
Тестовый прогон показывает, что он занимает около 50второй на моем рабочем столе - итерация за одну эпоху, которая примерно в 6 раз медленнее, чем итерация по предварительно внедренному массиву;
Я буду благодарен за комментарии и предложения по улучшению этой трубы;Может быть, весь мой подход к построению графиков с окном некорректен и есть эффективные альтернативы для получения одинаковых результатов?
Обновление: изменение части кода следующим образом приводит к увеличению скорости примерно на 20% (на самом деле не могу понять, почему):
ds = tf.data.Dataset.zip({key: tf.data.Dataset.from_tensor_slices(tz) for key, tz in input_tensors.items()})
def window_fn(ds):
ds = ds.window(time_embed, 1, 1, drop_remainder=True).\
flat_map(lambda x: tf.data.Dataset.zip({k: v.batch(time_embed) for k, v in x.items()}))
return ds
ds = ds.apply(window_fn)