Как улучшить производительность конвейера ввода данных? - PullRequest
3 голосов
/ 19 сентября 2019

Я пытаюсь оптимизировать мой конвейер ввода данных.Набор данных представляет собой набор из 450 файлов TFRecord размером ~ 70 МБ каждый, размещенных в GCS.Работа выполнена с GCP ML Engine.Графического процессора нет.

Вот конвейер:

def build_dataset(file_pattern):
    return tf.data.Dataset.list_files(
        file_pattern
    ).interleave(
        tf.data.TFRecordDataset,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).shuffle(
        buffer_size=2048
    ).batch(
        batch_size=2048,
        drop_remainder=True,
    ).cache(
    ).repeat(
    ).map(
        map_func=_parse_example_batch,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).prefetch(
        buffer_size=1
    )

С отображенной функцией:

def _bit_to_float(string_batch: tf.Tensor):
    return tf.reshape(tf.math.floormod(tf.dtypes.cast(tf.bitwise.right_shift(
        tf.expand_dims(tf.io.decode_raw(string_batch, tf.uint8), 2),
        tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8))
    ), tf.float32), 2), (tf.shape(string_batch)[0], -1))


def _parse_example_batch(example_batch):
    preprocessed_sample_columns = {
        "features": tf.io.VarLenFeature(tf.float32),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_example(example_batch, preprocessed_sample_columns)
    dense_float = tf.sparse.to_dense(samples["features"])
    bits_to_float = _bit_to_float(samples["booleanFeatures"])
    return (
        tf.concat([dense_float, bits_to_float], 1),
        tf.reshape(samples["label"], (-1, 1))
    )

Я пытался следовать рекомендациям учебное пособие по конвейеру данных и векторизация моей отображенной функции (в соответствии с рекомендациями mrry ).

С этими настройками данные загружаются с высокой скоростью (пропускная способность составляет около 200 МБ /s) процессор используется недостаточно (14%), и обучение идет очень медленно (более 1 часа в течение эпохи).

Я попытался настроить некоторые параметры, изменив аргументы interleave(), такие как num_parallel_calls илиcycle_length или TFRecordDataset аргументы типа num_parallel_calls.

В самой быстрой конфигурации используется этот набор параметров:

  • interleave.num_parallel_calls: 1
  • interleave.cycle_length: 8
  • TFRecordDataset.num_parallel_calls: 8

При этом на одну эпоху уходит всего ~ 20 минут. Однако загрузка ЦП составляет всего 50%, а потребление полосы пропускания составляет около 55 МБ / с

Вопросы:

  1. Как оптимизировать конвейер для достижения 100% ЦПиспользование (и что-то вроде 100 МБ / с потребления полосы пропускания)?
  2. Почему tf.data.experimental.AUTOTUNE не находит наилучшего значения для ускорения обучения?

Добрый, Алексис.


Редактировать

После еще нескольких экспериментов я пришел к следующему решению:

  1. Удалите шаг interleave, который уже обрабатывается TFRecordDataset, если num_parallel_calls больше 0.
  2. Обновите сопоставленную функцию, чтобы она выполняла только parse_example и decode_raw, возвращая кортеж `((,), ())
  3. cache послеmap
  4. Перемещение функции _bit_to_float как компонента модели

Наконец, вот код конвейера данных:

def build_dataset(file_pattern):
    return tf.data.TFRecordDataset(
        tf.data.Dataset.list_files(file_pattern),
        num_parallel_reads=multiprocessing.cpu_count(),
        buffer_size=70*1000*1000
    ).shuffle(
        buffer_size=2048
    ).map(
        map_func=split,
        num_parallel_calls=tf.data.experimental.AUTOTUNE
    ).batch(
        batch_size=2048,
        drop_remainder=True,
    ).cache(
    ).repeat(
    ).prefetch(
        buffer_size=32
    )


def split(example):
    preprocessed_sample_columns = {
        "features": tf.io.VarLenFeature(tf.float32),
        "booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
        "label": tf.io.FixedLenFeature((), tf.float32, -1)
    }
    samples = tf.io.parse_single_example(example, preprocessed_sample_columns)
    dense_float = tf.sparse.to_dense(samples["features"])
    bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
    return (
        (dense_float, bits_to_float),
        tf.reshape(samples["label"], (1,))
    )


def build_model(input_shape):
    feature = keras.Input(shape=(N,))
    bool_feature = keras.Input(shape=(M,), dtype="uint8")
    one_hot = dataset._bit_to_float(bool_feature)
    dense_input = tf.reshape(
        keras.backend.concatenate([feature, one_hot], 1),
        input_shape)
    output = actual_model(dense_input)

    model = keras.Model([feature, bool_feature], output)
    return model

def _bit_to_float(string_batch: tf.Tensor):
    return tf.dtypes.cast(tf.reshape(
        tf.bitwise.bitwise_and(
            tf.bitwise.right_shift(
                tf.expand_dims(string_batch, 2),
                tf.reshape(
                    tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8),
                    (1, 1, 8)
                ),
            ),
            tf.constant(0x01, dtype=tf.uint8)
        ),
        (tf.shape(string_batch)[0], -1)
    ), tf.float32)

Спасибодля всех этих оптимизаций:

  • Потребление полосы пропускания составляет около 90 МБ / с
  • Загрузка ЦП составляет около 20%
  • Первая эпоха тратит 20минут
  • Последовательные эпохи проводят по 5 минут каждый

Так что, похоже, это хорошая первая установка.Но процессор и BW по-прежнему не используются слишком часто, поэтому любые советы приветствуются!

...