Более быстрый способ преобразования вектора векторов в один непрерывный вектор с противоположным порядком хранения - PullRequest
12 голосов
/ 19 марта 2019

У меня есть std::vector<std::vector<double>>, который я пытаюсь преобразовать в один смежный вектор как можно быстрее.Мой вектор имеет форму примерно 4000 x 50.

Проблема в том, что иногда мне нужен мой выходной вектор в непрерывном порядке по главному столбцу (просто конкатенация внутренних векторов моего 2-го входного вектора), а иногда мне нужен мой выходной вектор в последовательном порядке по главной строкетребующий транспонирования.

Я обнаружил, что наивный цикл for довольно быстр для преобразования в мажорный вектор столбца:

auto to_dense_column_major_naive(std::vector<std::vector<double>> const & vec)
    -> std::vector<double>
{
    auto n_col = vec.size();
    auto n_row = vec[0].size();
    std::vector<double> out_vec(n_col * n_row);
    for (size_t i = 0; i < n_col; ++i)
        for (size_t j = 0; j < n_row; ++j)
            out_vec[i * n_row + j] = vec[i][j];
    return out_vec;
}

Но, очевидно, подобный подход очень медленный для построчного преобразования, потому чтовсего кеша не хватает.Так что для построчного преобразования я подумал, что стратегия блокирования для продвижения локальности кэша может быть моей лучшей ставкой:

auto to_dense_row_major_blocking(std::vector<std::vector<double>> const & vec)
    -> std::vector<double>
{
    auto n_col = vec.size();
    auto n_row = vec[0].size();
    std::vector<double> out_vec(n_col * n_row);
    size_t block_side = 8;

    for (size_t l = 0; l < n_col; l += block_side) {
        for (size_t k = 0; k < n_row; k += block_side) {
            for (size_t j = l; j < l + block_side && j < n_col; ++j) {
                auto const &column = vec[j];
                for (size_t i = k; i < k + block_side && i < n_row; ++i)
                    out_vec[i * n_col + j] = column[i];
            }
        }
    }
    return out_vec;
}

Это значительно быстрее, чем простой цикл для преобразования по основным строкам, но все же почти на порядокпо величине медленнее, чем наивный цикл по главному столбцу на моем входном размере.

enter image description here

У меня вопрос , существует ли более быстрый подход к преобразованию (мажорный столбец) векторов двойников вединый смежный основной вектор-ряд?Я пытаюсь понять, каким должен быть предел скорости этого кода, и поэтому задаюсь вопросом, не упускаю ли я что-то очевидное.Мое предположение состояло в том, что блокирование даст мне намного большее ускорение, чем оно, по-видимому, дает.


Диаграмма была сгенерирована с использованием QuickBench (и несколько проверена с помощью GBench локально на моей машине) с этим кодом: (Clang 7, C ++ 20, -O3)

auto to_dense_column_major_naive(std::vector<std::vector<double>> const & vec)
    -> std::vector<double>
{
    auto n_col = vec.size();
    auto n_row = vec[0].size();
    std::vector<double> out_vec(n_col * n_row);
    for (size_t i = 0; i < n_col; ++i)
        for (size_t j = 0; j < n_row; ++j)
            out_vec[i * n_row + j] = vec[i][j];
    return out_vec;
}

auto to_dense_row_major_naive(std::vector<std::vector<double>> const & vec)
    -> std::vector<double>
{
    auto n_col = vec.size();
    auto n_row = vec[0].size();
    std::vector<double> out_vec(n_col * n_row);
    for (size_t i = 0; i < n_col; ++i)
        for (size_t j = 0; j < n_row; ++j)
            out_vec[j * n_col + i] = vec[i][j];
    return out_vec;
}

auto to_dense_row_major_blocking(std::vector<std::vector<double>> const & vec)
    -> std::vector<double>
{
    auto n_col = vec.size();
    auto n_row = vec[0].size();
    std::vector<double> out_vec(n_col * n_row);
    size_t block_side = 8;

    for (size_t l = 0; l < n_col; l += block_side) {
        for (size_t k = 0; k < n_row; k += block_side) {
            for (size_t j = l; j < l + block_side && j < n_col; ++j) {
                auto const &column = vec[j];
                for (size_t i = k; i < k + block_side && i < n_row; ++i)
                    out_vec[i * n_col + j] = column[i];
            }
        }
    }
    return out_vec;
}

auto to_dense_column_major_blocking(std::vector<std::vector<double>> const & vec)
    -> std::vector<double>
{
    auto n_col = vec.size();
    auto n_row = vec[0].size();
    std::vector<double> out_vec(n_col * n_row);
    size_t block_side = 8;

    for (size_t l = 0; l < n_col; l += block_side) {
        for (size_t k = 0; k < n_row; k += block_side) {
            for (size_t j = l; j < l + block_side && j < n_col; ++j) {
                auto const &column = vec[j];
                for (size_t i = k; i < k + block_side && i < n_row; ++i)
                    out_vec[j * n_row + i] = column[i];
            }
        }
    }
    return out_vec;
}

auto make_vecvec() -> std::vector<std::vector<double>>
{
  std::vector<std::vector<double>> vecvec(50, std::vector<double>(4000));
  std::mt19937 mersenne {2019};
  std::uniform_real_distribution<double> dist(-1000, 1000);
  for (auto &vec: vecvec)
   for (auto &val: vec)
       val = dist(mersenne);
  return vecvec;
}

static void NaiveColumnMajor(benchmark::State& state) {
  // Code before the loop is not measured

  auto vecvec = make_vecvec();
  for (auto _ : state) {
    benchmark::DoNotOptimize(to_dense_column_major_naive(vecvec));
  }
}
BENCHMARK(NaiveColumnMajor);

static void NaiveRowMajor(benchmark::State& state) {
  // Code before the loop is not measured

  auto vecvec = make_vecvec();
  for (auto _ : state) {
    benchmark::DoNotOptimize(to_dense_row_major_naive(vecvec));
  }
}
BENCHMARK(NaiveRowMajor);

static void BlockingRowMajor(benchmark::State& state) {
  // Code before the loop is not measured

  auto vecvec = make_vecvec();
  for (auto _ : state) {
    benchmark::DoNotOptimize(to_dense_row_major_blocking(vecvec));
  }
}
BENCHMARK(BlockingRowMajor);

static void BlockingColumnMajor(benchmark::State& state) {
  // Code before the loop is not measured

  auto vecvec = make_vecvec();
  for (auto _ : state) {
    benchmark::DoNotOptimize(to_dense_column_major_blocking(vecvec));
  }
}
BENCHMARK(BlockingColumnMajor);

Ответы [ 2 ]

8 голосов
/ 22 марта 2019

Прежде всего, я съеживаюсь всякий раз, когда что-то квалифицируется как «очевидно».Это слово часто используется, чтобы скрыть недостатки в своих вычетах.

Но очевидно, что подобный подход очень медленный для построчного преобразования из-за всех ошибок кеша.

Я не уверен, что должно быть очевидным: будет показываться строковое преобразование или что оно медленное из-за пропадания кэша.В любом случае, я нахожу это не очевидным.В конце концов, здесь есть два аспекта кэширования, не так ли?Один для чтения и один для письма?Давайте посмотрим на код с точки зрения чтения:

row_major_naive

for (size_t i = 0; i < n_col; ++i)
    for (size_t j = 0; j < n_row; ++j)
        out_vec[j * n_col + i] = vec[i][j];

Последовательные чтения из vec - это чтения непрерывной памяти: vec[i][0], за которыми следует vec[i][1] и т. Д. Очень хорошо для кеширования.Итак ... кеш отсутствует?Медленный?:) Может быть, не так очевидно.

Тем не менее, есть кое-что, что можно почерпнуть из этого.Требование только неправильно, утверждая "очевидно".Есть не локальные проблемы, но они возникают в конце письма.(Последовательные записи компенсируются пробелом для значений 50 double.) Эмпирическое тестирование подтверждает медлительность.Таким образом, может быть, решение состоит в том, чтобы переключить то, что считается «очевидным»?

перевернутая мажорная строка

for (size_t j = 0; j < n_row; ++j)
    for (size_t i = 0; i < n_col; ++i)
        out_vec[j * n_col + i] = vec[i][j];

Все, что я здесь сделал, - это повернул петли.Буквально поменяйте местами порядок этих двух строк кода, затем измените отступ.Теперь последовательные чтения потенциально повсюду, так как они читают из разных векторов.Тем не менее, последовательные записи теперь в смежные блоки памяти.В каком-то смысле мы находимся в той же ситуации, что и раньше.Но так же, как и раньше, нужно измерить производительность, прежде чем принять «быстрое» или «медленное».

NaiveColumnMajor: 3,4 секунды
NaiveRowMajor: 7,7 секунды
FlippedRowMajor: 4,2 секунды
BlockingRowMajor: 4,4 секунды
BlockingColumnMajor: 3,9 секунды

Все еще медленнее, чем основная конверсия наивного столбца.Однако этот подход не только быстрее наивного майора строк, но и быстрее, чем блокирует мажор строк.По крайней мере, на моем компьютере (используя gcc -O3 и , очевидно, : P, повторяясь тысячи раз).Пробег может отличаться.Я не знаю, что скажут необычные инструменты профилирования.Дело в том, что иногда проще, тем лучше.

Для забавы я сделал тест, в котором размеры меняются местами (изменяясь от 50 векторов 4000 элементов до 4000 векторов 50 элементов).Все методы пострадали таким образом, но "NaiveRowMajor" получил самый большой удар.Стоит отметить, что «перевернутая мажорная строка» отстала от версии блокировки.Таким образом, как и следовало ожидать, лучший инструмент для работы зависит от того, что именно работа.секунд
BlockingRowMajor: 4,9 секунды
BlockingColumnMajor: 4,5 секунды

(Кстати, я также попробовал трюк с переворотом на версии блокировки. Изменение было небольшим - около 0,2 -и противоположность отклонению наивной версии. То есть «блокировка с переворотом» была медленнее, чем «блокировка» для векторов вопроса 50 на 4000, но быстрее для моего варианта 4000 на 50. Точная настройка может улучшить результаты.)


Обновление: Я провел еще немного тестирования с помощью трюка с переворотом на блокирующей версии.Эта версия имеет четыре цикла, поэтому «переворот» не так прост, как в случае только двух циклов.Похоже, что замена порядка двух внешних циклов плохо влияет на производительность, а замена внутренних двух циклов - это хорошо.(Первоначально я сделал оба и получил смешанные результаты.) Когда я поменял местами только внутренние петли, я измерил 3,8 секунды (и 4,1 секунды в сценарии 4000-из-50), делая это лучшим рядом-много вариант в моих тестах.

рядный основной гибрид

for (size_t l = 0; l < n_col; l += block_side)
    for (size_t i = 0; i < n_row; ++i)
        for (size_t j = l; j < l + block_side && j < n_col; ++j)
            out_vec[i * n_col + j] = vec[j][i];

(После замены внутренних циклов я объединил средние циклы.)

Что касается теории, стоящей за этим, я бы предположил, что это равносильно попытке записать один блок кэша за раз.Как только блок записан, попробуйте повторно использовать векторы (vec[j]), прежде чем они будут извлечены из кэша.После того, как вы исчерпали эти исходные векторы, переходите к новой группе исходных векторов, снова записывая полные блоки за раз.

0 голосов
/ 22 марта 2019

Я только что добавил две функции параллельной версии вещей

#include <ppl.h>

auto ppl_to_dense_column_major_naive(std::vector<std::vector<double>> const & vec)
-> std::vector<double>
{
    auto n_col = vec.size();
    auto n_row = vec[0].size();
    std::vector<double> out_vec(n_col * n_row);

    size_t vecLen = out_vec.size();
    concurrency::parallel_for(size_t(0), vecLen, [&](size_t i)
    {
        size_t row = i / n_row;
        size_t column = i % n_row;

        out_vec[i] = vec[row][column];
    });

    return out_vec;
}

auto ppl_to_dense_row_major_naive(std::vector<std::vector<double>> const & vec)
-> std::vector<double>
{
    auto n_col = vec.size();
    auto n_row = vec[0].size();
    std::vector<double> out_vec(n_col * n_row);
    size_t vecLen = out_vec.size();


    concurrency::parallel_for(size_t(0), vecLen, [&](size_t i)
    {
        size_t column = i / n_col;
        size_t row = i % n_col;

        out_vec[i] = vec[row][column];
    });

    return out_vec;
}

и дополнительные эталонные коды для всех них

template< class _Fn, class ... Args >
auto callFncWithPerformance( std::string strFnName,  _Fn toCall, Args&& ...args )
{
    auto start = std::chrono::high_resolution_clock::now();
    auto toRet = toCall( std::forward<Args>(args)... );
    auto end = std::chrono::high_resolution_clock::now();

    std::chrono::duration<double> diff = end - start;

    std::cout << strFnName << ": " << diff.count() << " s" << std::endl;

    return toRet;
}

template< class _Fn, class ... Args >
auto second_callFncWithPerformance(_Fn toCall, Args&& ...args)
{
    std::string strFnName(typeid(toCall).name());

    auto start = std::chrono::high_resolution_clock::now();
    auto toRet = toCall(std::forward<Args>(args)...);
    auto end = std::chrono::high_resolution_clock::now();

    std::chrono::duration<double> diff = end - start;

    std::cout << strFnName << ": " << diff.count() << " s";

    return toRet;
}


#define MAKEVEC( FN, ... ) callFncWithPerformance( std::string( #FN ) , FN  , __VA_ARGS__ )


int main()
{
    //prepare vector
    auto vec = make_vecvec();

    std::vector< double > vecs[]
    {
        std::vector<double>(MAKEVEC(to_dense_column_major_naive, vec)),
        std::vector<double>(MAKEVEC(to_dense_row_major_naive, vec)),
        std::vector<double>(MAKEVEC(ppl_to_dense_column_major_naive, vec)),
        std::vector<double>(MAKEVEC(ppl_to_dense_row_major_naive, vec)),
        std::vector<double>(MAKEVEC(to_dense_row_major_blocking, vec)),
        std::vector<double>(MAKEVEC(to_dense_column_major_blocking, vec)),
    };

    //system("pause");

    return 0;
}

и вот ниже результат этих

Debug x64

to_dense_column_major_naive: 0.166859 s
to_dense_row_major_naive: 0.192488 s
ppl_to_dense_column_major_naive: 0.0557423 с
ppl_to_dense_row_major_naive: 0.0514017 с
to_dense_column_major_blocking: 0.118465 s
to_dense_row_major_blocking:0,117732 с

отладка x86

to_dense_column_major_naive: 0,15242 с
to_dense_row_major_naive: 0,158746 с
ppl_to_dense_column_major_naive_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_0_023с
to_dense_row_major_blocking: 0,107727 с

Выпуск x64

to_dense_column_major_naive: 0,000874 с
to_dense_row_major_naive: 0,0011973 с
стол__с___с___с___с____________0_054639 с
ppl_to_dense_row_major_naive: 0,0012034 с
to_dense_column_major_blocking: 0,0008023 с
to_dense_row_major_blocking: 0,0010282 с

*1038*

10,015_1515: 515: 515: 515: 515: 515: 515: 15: 15: 15: 15_15: 15: 15: 15: 15: 15: 15: 15: 15: 15-х, добившихся до тех пор:с
ppl_to_dense_column_major_naive: 0,0053351 с
ppl_to_dense_row_major_naive: 0,0013022 с
to_dense_column_major_blocking: 0,0008761 с
to_dense_row_major_40 * 10 * 10 * 1040 - всего 244слишком маленький набор данных.
А также он слишком мал, работает.
Хотя я буду пост для кого-то еще, чтобы ссылаться на эти функции.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...