распараллеливание tf.data.Dataset.from_generator с TF2.1 - PullRequest
0 голосов
/ 07 февраля 2020

Это уже 2 сообщения на эту тему, но они не были обновлены для последнего выпуска TF2.1 ...

Короче говоря, у меня есть много изображений tif для чтения и анализа с указанным c конвейером.

import tensorflow as tf
import numpy as np

files = # a list of str
labels = # a list of int
n_unique_label = len(np.unique(labels))

gen = functools.partial(generator, file_list=files, label_list=labels, param1=x1, param2=x2)
dataset = tf.data.Dataset.from_generator(gen, output_types=(tf.float32, tf.int32))
dataset = dataset.map(lambda b, c: (b, tf.one_hot(c, depth=n_unique_label)))

Эта обработка работает хорошо. Тем не менее, мне нужно распараллелить часть анализа файла, пробуя следующее решение:

files = # a list of str
files = tensorflow.data.Dataset.from_tensor_slices(files)

def wrapper(file_path):
    parser = partial(tif_parser, param1=x1, param2=x2)
    return tf.py_function(parser, inp=[file_path], Tout=[tf.float32])

dataset = files.map(wrapper, num_parallel_calls=2)

Разница в том, что я анализирую один файл за раз здесь с помощью функции parser. Однако, но это не работает:

  File "loader.py", line 643, in tif_parser
    image = numpy.array(Image.open(file_path)).astype(float)

  File "python3.7/site-packages/PIL/Image.py", line 2815, in open
    fp = io.BytesIO(fp.read())

AttributeError: 'tensorflow.python.framework.ops.EagerTensor' object has no attribute 'read'


     [[{{node EagerPyFunc}}]] [Op:IteratorGetNextSync]

Насколько я понимаю, функция tif_parser получает не строку, а (неоцененный) тензор. В настоящее время эта функция довольно проста:

def tif_parser(file_path, param1=1, param2=2):
    image = numpy.array(Image.open(file_path)).astype(float)
    image /= 255.0

    return image

1 Ответ

0 голосов
/ 14 февраля 2020

Вот как я поступил

dataset = tf.data.Dataset.from_tensor_slices((files, labels))

def wrapper(file_path, label):
    import functools
    parser = functools.partial(tif_parser,  param1=x1, param2=x2)
    return tf.data.Dataset.from_generator(parser, (tf.float32, tf.int32), args=(file_path, label))

dataset = dataset.interleave(wrapper, cycle_length=tf.data.experimental.AUTOTUNE)

# The labels are converted to 1-hot vectors, could be integrated in tif_parser
dataset = dataset.map(lambda i, l: (i, tf.one_hot(l, depth=unique_label_count)))

dataset = dataset.shuffle(buffer_size=file_count, reshuffle_each_iteration=True)
dataset = dataset.batch(batch_size=batch_size, drop_remainder=False)
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)

Конкретно, я генерирую набор данных каждый раз, когда вызывается парсер. Парсер запускается cycle_length раз при каждом вызове, это означает, что cycle_length изображения читаются одновременно. Это подходит для моего конкретного случая c, потому что я не могу загрузить все изображения в память. Я не уверен, правильно ли используется предварительная выборка или нет.

...