Как передать .h5 файлы в конвейер tf.data в модели tenorflow - PullRequest
0 голосов
/ 26 марта 2019

Я пытаюсь оптимизировать входной конвейер для данных .h5 с помощью tf.data. Но я столкнулся с TypeError: expected str, bytes or os.PathLike object, not Tensor. Я провел исследование, но не могу найти ничего о преобразовании тензора строки в строку.

Этот упрощенный код является исполняемым и возвращает ту же ошибку:

batch_size = 1000
conv_size = 3
nb_conv = 32
learning_rate = 0.0001

# define parser function
def parse_function(fname):
    with h5py.File(fname, 'r') as f: #Error comes from here
        X = f['X'].reshape(batch_size, patch_size, patch_size, 1)
        y = f['y'].reshape(batch_size, patch_size, patch_size, 1)
        return X, y

# create a list of files path
flist = []
for dirpath, _, fnames in os.walk('./proc/'):
    for fname in fnames:
        if fname.startswith('{}_{}'.format(patch_size, batch_size)) and fname.endswith('h5'):
            flist.append(fname)

# prefetch data
dataset = tf.data.Dataset.from_tensor_slices((flist))
dataset = dataset.shuffle(len(flist))
dataset = dataset.map(parse_function, num_parallel_calls=4)
dataset = dataset.batch(1)
dataset = dataset.prefetch(3)

# simplest model that I think of
X_ph = tf.placeholder(tf.float32, shape=None)
y_ph = tf.placeholder(tf.float32, shape=None)
W = tf.get_variable('w', shape=[conv_size, conv_size, 1, 1], initializer=tf.contrib.layers.xavier_initializer())
loss = tf.reduce_mean(tf.losses.mean_squared_error(tf.nn.softmax(labels=y_ph, predictions=tf.matmul(X_ph, W))))
train_op = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss)

# start session
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    print(sess.run(train_op, feed_dict={X_ph: dataset[0], y_ph: dataset[1]}))

Очевидно, fname является тензором строки, но позиционный аргумент ожидает только строку. Я не могу найти документацию по этому вопросу. И ответ другого поста не решает эту проблему. В моем случае я работаю только с h5, где один h5 хранит одну партию.


Обновление решения: Благодаря комментарию @kvish, часть загрузки файла .h5 решена. Код обновлен простым простым уровнем, заполнители заняты. Каждый .h5 - это один пакет. Я хочу выполнять предварительную выборку в параллельных нескольких пакетах (h5py не поддерживает многопоточное чтение, поэтому я записываю пакеты в несколько файлов). Можно копировать-вставить-и запустить :

import h5py
import threading
import numpy as np
import tensorflow as tf

# generate some img data
for i in range(5):
    with h5py.File('./test_{}.h5'.format(i), 'w') as f:
        f.create_dataset('X', shape=(1000, 100, 100), dtype='float32', data=np.random.rand(10**7).reshape(1000, 100, 100))
        f.create_dataset('y', shape=(1000, 100, 100), dtype='float32', data=np.random.rand(10**7).reshape(1000, 100, 100))
        print(threading.get_ident())

# params
num_cores = 3
shuffle_size = 1
batch_size = 1

# read .h5 file
def parse_file(f):
    print(f.decode('utf-8'))
    with h5py.File(f.decode("utf-8"), 'r') as fi:
        X = fi['X'][:].reshape(1000, 100, 100, 1)
        y = fi['y'][:].reshape(1000, 100, 100, 1)
        print(threading.get_ident())  # to see the thread id
        return X, y

# py_func wrapper
def parse_file_tf(filename):
    return tf.py_func(parse_file, [filename], [tf.float32, tf.float32])

# tf.data input pipeline
files = tf.data.Dataset.list_files('./test_*.h5')
dataset = files.map(parse_file_tf, num_parallel_calls=num_core)
dataset = dataset.batch(batch_size).shuffle(shuffle_size).prefetch(3)
it = dataset.make_initializable_iterator()
iter_init_op = it.initializer
X_it, y_it = it.get_next()

# simplest model that I can think of 
with tf.name_scope("Conv1"):
    W = tf.get_variable("W", shape=[3, 3, 1, 1],
                         initializer=tf.contrib.layers.xavier_initializer())
    b = tf.get_variable("b", shape=[1], initializer=tf.contrib.layers.xavier_initializer())
    layer1 = tf.nn.conv2d(X_it, W, strides=[1, 1, 1, 1], padding='SAME') + b
    out = tf.nn.relu(layer1)

loss = tf.reduce_mean(tf.losses.mean_squared_error(labels=y_it, predictions=out))
train_op = tf.train.AdamOptimizer(learning_rate=0.0001).minimize(loss)

# session
sess = tf.Session()
sess.run(tf.global_variables_initializer())
sess.run(iter_init_op)
sess.run([train_op])
sess.close()

Каким-то образом возникнет еще одна проблема cudnn, не связанная с этим сообщением.

Тензор-процессор v1.12: отлично работает

tenorflow-gpu v1.12: время выполнения проблема возникает

Traceback (последний вызов был последним): File "/Envs/tf/lib/python3.6/site-packages/tensorflow/python/client/session.py", строка 1334, в _do_call вернуть файл fn (* args) "/envs/tf/lib/python3.6/site-packages/tensorflow/python/client/session.py", строка 1319, в _run_fn options, feed_dict, fetch_list, target_list, run_metadata) Файл "/envs/tf/lib/python3.6/site-packages/tensorflow/python/client/session.py", строка 1407, в _call_tf_sessionrun run_metadata) tenorflow.python.framework.errors_impl.NotFoundError: без алгоритма работал! [[{{узел Conv1 / Conv2D}} = Conv2D [T = DT_FLOAT, data_format = "NCHW", расширения = [1, 1, 1, 1], padding = "SAME", stepdes = [1, 1, 1, 1], use_cudnn_on_gpu = true, _device = "/ работа: локальная / реплика: 0 / задача: 0 / устройства: ГПУ: 0"] (градиенты / Conv1 / Conv2D_grad / Conv2DBackpropFilter-0-TransposeNHWCToNCHW-LayoutOptimizer, W / read)]] [[{{узел mean_squared_error / num_present / broadcast_weights / assert_broadcastable / AssertGuard / Утверждай / Switch_2 / _37}} = _Recvclient_terminated = false, recv_device = "/ job: localhost / replica: 0 / task: 0 / device: CPU: 0", send_device = "/ работа: локальный / реплика: 0 / задача: 0 / устройства: GPU: 0", send_device_incarnation = 1, тензор_имя = "edge_63_me ... t / Switch_2", tensor_type = DT_INT32, _device = "/ работа: локальный / реплика: 0 / задача: 0 / Устройство: ЦП: 0"]] Тензор-процессор v1.12: отлично работает!

1 Ответ

1 голос
/ 27 марта 2019

Вот пример того, как вы можете обернуть функцию с помощью py_func . Обратите внимание, что это устарело в TF V2. Вы можете следить за документацией для получения более подробной информации.

def parse_function_wrapper(filename):
   # Assuming your data and labels are float32
   # Your input is parse_function, who arg is filename, and you get X and y as output
   # whose datatypes are indicated by the tuple argument  
   features, labels = tf.py_func(
       parse_function, [filename], (tf.float32, tf.float32)) 
   return features, labels

# Create dataset of filenames.
dataset = tf.data.Dataset.from_tensor_slices(flist)
dataset = dataset.shuffle(len(flist))
dataset = dataset.map(parse_function_wrapper)
...