Как ускорить мой разреженный матричный решатель? - PullRequest
17 голосов
/ 05 марта 2010

Я пишу разреженный матричный решатель, используя метод Гаусса-Зейделя.По профилированию я определил, что около половины моей программы тратится внутри решателя.Критически важная часть производительности выглядит следующим образом:

size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
for (size_t y = 1; y < d_ny - 1; ++y) {
    for (size_t x = 1; x < d_nx - 1; ++x) {
        d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
        ++ic; ++iw; ++ie; ++is; ++in;
    }
    ic += 2; iw += 2; ie += 2; is += 2; in += 2;
}

Все задействованные массивы относятся к типу float.На самом деле это не массивы, а объекты с перегруженным оператором [], который (я думаю) должен быть оптимизирован, но определяется следующим образом:

inline float &operator[](size_t i) { return d_cells[i]; }
inline float const &operator[](size_t i) const { return d_cells[i]; }

Для d_nx = d_ny = 128 это можно запуститьпримерно 3500 раз в секунду на Intel i7 920. Это означает, что тело внутреннего цикла работает 3500 * 128 * 128 = 57 миллионов раз в секунду.Поскольку речь идет только о какой-то простой арифметике, мне кажется, что процессор с тактовой частотой 2,66 ГГц выглядит как меньшее число.

Может быть, это не ограничивается мощностью процессора, а пропускной способностью памяти?Итак, один массив 128 * 128 float потребляет 65 кБ, поэтому все 6 массивов должны легко помещаться в кэш L3 ЦП (который составляет 8 МБ).Предполагая, что ничего не кэшируется в регистрах, я считаю 15 обращений к памяти во внутреннем теле цикла.В 64-битной системе это 120 байт на итерацию, поэтому 57 миллионов * 120 байт = 6,8 ГБ / с.Кэш-память L3 работает на частоте 2,66 ГГц, поэтому она того же порядка.Я предполагаю, что память действительно является узким местом.

Чтобы ускорить это, я попытался сделать следующее:

  • Скомпилировать с g++ -O3.(Ну, я делал это с самого начала.)

  • Распараллеливание на 4 ядра с использованием прагм OpenMP.Я должен перейти на алгоритм Якоби, чтобы избежать чтения и записи в тот же массив.Это требует, чтобы я выполнял вдвое больше итераций, что приводит к чистому результату с примерно одинаковой скоростью.

  • Работа с деталями реализации тела цикла, например использование указателей вместо индексов.Без эффекта.

Какой лучший способ ускорить этого парня?Поможет ли это переписать внутреннее тело в сборке (я должен сначала изучить это)?Должен ли я запустить это на GPU вместо этого (что я знаю, как это сделать, но это такая проблема)?Какие-нибудь другие яркие идеи?

(NB I do принимает «нет» за ответ, например: «это нельзя сделать значительно быстрее, потому что ...»)

Обновление : по запросу, вот полная программа:

#include <iostream>
#include <cstdlib>
#include <cstring>

using namespace std;

size_t d_nx = 128, d_ny = 128;
float *d_x, *d_b, *d_w, *d_e, *d_s, *d_n;

void step() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
            d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}

void solve(size_t iters) {
    for (size_t i = 0; i < iters; ++i) {
        step();
    }
}

void clear(float *a) {
    memset(a, 0, d_nx * d_ny * sizeof(float));
}

int main(int argc, char **argv) {
    size_t n = d_nx * d_ny;
    d_x = new float[n]; clear(d_x);
    d_b = new float[n]; clear(d_b);
    d_w = new float[n]; clear(d_w);
    d_e = new float[n]; clear(d_e);
    d_s = new float[n]; clear(d_s);
    d_n = new float[n]; clear(d_n);
    solve(atoi(argv[1]));
    cout << d_x[0] << endl; // prevent the thing from being optimized away
}

Я компилирую и запускаю ее следующим образом:

$ g++ -o gstest -O3 gstest.cpp
$ time ./gstest 8000
0

real    0m1.052s
user    0m1.050s
sys     0m0.010s

(Это 8000вместо 3500 итераций в секунду, потому что моя «настоящая» программа тоже делает много других вещей. Но она репрезентативна.)

Обновление 2 : мне сказали, что унифицированные значения не могутбыть репрезентативным, потому что значения NaN и Inf могут замедлить процесс.Теперь очищаем память в примере кода.Тем не менее, для меня нет никакой разницы в скорости выполнения.

Ответы [ 7 ]

5 голосов
/ 05 марта 2010

Я думаю, что мне удалось оптимизировать его, вот код, создать новый проект в VC ++, добавить этот код и просто скомпилировать в «Release».

#include <iostream>
#include <cstdlib>
#include <cstring>

#define _WIN32_WINNT 0x0400
#define WIN32_LEAN_AND_MEAN
#include <windows.h>

#include <conio.h>

using namespace std;

size_t d_nx = 128, d_ny = 128;
float *d_x, *d_b, *d_w, *d_e, *d_s, *d_n;

void step_original() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
            d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}
void step_new() {
    //size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    float
        *d_b_ic,
        *d_w_ic,
        *d_e_ic,
        *d_x_ic,
        *d_x_iw,
        *d_x_ie,
        *d_x_is,
        *d_x_in,
        *d_n_ic,
        *d_s_ic;

    d_b_ic = d_b;
    d_w_ic = d_w;
    d_e_ic = d_e;
    d_x_ic = d_x;
    d_x_iw = d_x;
    d_x_ie = d_x;
    d_x_is = d_x;
    d_x_in = d_x;
    d_n_ic = d_n;
    d_s_ic = d_s;

    for (size_t y = 1; y < d_ny - 1; ++y)
    {
        for (size_t x = 1; x < d_nx - 1; ++x)
        {
            /*d_x[ic] = d_b[ic]
                - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
                - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];*/
            *d_x_ic = *d_b_ic
                - *d_w_ic * *d_x_iw - *d_e_ic * *d_x_ie
                - *d_s_ic * *d_x_is - *d_n_ic * *d_x_in;
            //++ic; ++iw; ++ie; ++is; ++in;
            d_b_ic++;
            d_w_ic++;
            d_e_ic++;
            d_x_ic++;
            d_x_iw++;
            d_x_ie++;
            d_x_is++;
            d_x_in++;
            d_n_ic++;
            d_s_ic++;
        }
        //ic += 2; iw += 2; ie += 2; is += 2; in += 2;
        d_b_ic += 2;
        d_w_ic += 2;
        d_e_ic += 2;
        d_x_ic += 2;
        d_x_iw += 2;
        d_x_ie += 2;
        d_x_is += 2;
        d_x_in += 2;
        d_n_ic += 2;
        d_s_ic += 2;
    }
}

void solve_original(size_t iters) {
    for (size_t i = 0; i < iters; ++i) {
        step_original();
    }
}
void solve_new(size_t iters) {
    for (size_t i = 0; i < iters; ++i) {
        step_new();
    }
}

void clear(float *a) {
    memset(a, 0, d_nx * d_ny * sizeof(float));
}

int main(int argc, char **argv) {
    size_t n = d_nx * d_ny;
    d_x = new float[n]; clear(d_x);
    d_b = new float[n]; clear(d_b);
    d_w = new float[n]; clear(d_w);
    d_e = new float[n]; clear(d_e);
    d_s = new float[n]; clear(d_s);
    d_n = new float[n]; clear(d_n);

    if(argc < 3)
        printf("app.exe (x)iters (o/n)algo\n");

    bool bOriginalStep = (argv[2][0] == 'o');
    size_t iters = atoi(argv[1]);

    /*printf("Press any key to start!");
    _getch();
    printf(" Running speed test..\n");*/

    __int64 freq, start, end, diff;
    if(!::QueryPerformanceFrequency((LARGE_INTEGER*)&freq))
        throw "Not supported!";
    freq /= 1000000; // microseconds!
    {
        ::QueryPerformanceCounter((LARGE_INTEGER*)&start);
        if(bOriginalStep)
            solve_original(iters);
        else
            solve_new(iters);
        ::QueryPerformanceCounter((LARGE_INTEGER*)&end);
        diff = (end - start) / freq;
    }
    printf("Speed (%s)\t\t: %u\n", (bOriginalStep ? "original" : "new"), diff);
    //_getch();


    //cout << d_x[0] << endl; // prevent the thing from being optimized away
}

Запустите его так:

app.exe 10000 o

app.exe 10000 n

«o» означает старый код, ваш.

«n» - мой, новыйодин.

Мои результаты: Скорость (оригинал):

1515028

1523171

1495988

Скорость (новая):

966012

984110

1006045

Улучшение примерно на 30%.

Логика: вы использовали счетчики индекса для доступа/ манипулировать.Я использую указатели.Во время работы установите точку останова на определенной строке кода вычисления в отладчике VC ++ и нажмите клавишу F8.Вы получите окно дизассемблера.Вы увидите произведенные коды операций (код сборки).

В любом случае, посмотрите:

int * x = ...;

x [3] = 123;

Это говорит ПК поставить указатель x на регистр (скажем, EAX).Добавьте его (3 * sizeof (int)).Только тогда установите значение 123.

Подход с указателями намного лучше, как вы можете понять, потому что мы сокращаем процесс добавления, фактически мы обрабатываем его сами, таким образом, можем оптимизировать при необходимости.

Надеюсь, это поможет.

Sidenote для сотрудников stackoverflow.com: Отличный сайт, надеюсь, я слышал об этом давно!

5 голосов
/ 06 марта 2010

Пара идей:

  1. Используйте SIMD.Вы можете загрузить 4 поплавка за раз из каждого массива в регистр SIMD (например, SSE на Intel, VMX на PowerPC).Недостатком этого является то, что некоторые из значений d_x будут «устаревшими», так что ваша скорость сходимости пострадает (но не так плохо, как итерация Якоби);Трудно сказать, компенсирует ли это ускорение.

  2. Использование SOR .Это просто, не требует большого количества вычислений и может улучшить вашу скорость сходимости даже при относительно консервативном значении релаксации (скажем, 1,5).

  3. Использовать сопряженный градиент.Если это для этапа проецирования моделирования жидкости (то есть обеспечения несжимаемости), вы должны иметь возможность применять CG и получить гораздо лучшую скорость сходимости.Хороший предварительный кондиционер помогает еще больше.

  4. Используйте специализированный решатель.Если линейная система возникает из уравнения Пуассона , вы можете добиться даже большего успеха, чем метод сопряженного градиента, используя методы, основанные на БПФ.похоже, система, которую вы пытаетесь решить, я могу дать еще несколько советов по № 3 и № 4.

3 голосов
/ 05 марта 2010

Во-первых, здесь, похоже, есть проблема с конвейеризацией. Цикл читает из значения в d_x, которое только что было записано, но, очевидно, он должен ждать завершения этой записи. Простая перестановка порядка вычислений, выполнение чего-то полезного во время ожидания делает его почти вдвое быстрее:

d_x[ic] = d_b[ic]
                        - d_e[ic] * d_x[ie]
    - d_s[ic] * d_x[is] - d_n[ic] * d_x[in]
    - d_w[ic] * d_x[iw] /* d_x[iw] has just been written to, process this last */;

Это было Имон Нербонн , который понял это. Много голосов за него! Я бы никогда не догадался.

2 голосов
/ 05 марта 2010

Ответ Пони мне кажется правильным.

Я просто хочу отметить, что в этом типе проблемы вы часто получаете выгоду от локальности памяти. Сейчас массивы b,w,e,s,n находятся в разных местах памяти. Если бы вы могли не уместить проблему в кеше L3 (в основном в L2), то это было бы плохо, и решение такого рода было бы полезно:

size_t d_nx = 128, d_ny = 128;
float *d_x;

struct D { float b,w,e,s,n; };
D *d;

void step() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
            d_x[ic] = d[ic].b
                - d[ic].w * d_x[iw] - d[ic].e * d_x[ie]
                - d[ic].s * d_x[is] - d[ic].n * d_x[in];
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}
void solve(size_t iters) { for (size_t i = 0; i < iters; ++i) step(); }
void clear(float *a) { memset(a, 0, d_nx * d_ny * sizeof(float)); }

int main(int argc, char **argv) {
    size_t n = d_nx * d_ny;
    d_x = new float[n]; clear(d_x);
    d = new D[n]; memset(d,0,n * sizeof(D));
    solve(atoi(argv[1]));
    cout << d_x[0] << endl; // prevent the thing from being optimized away
}

Например, это решение с разрешением 1280x1280 чуть менее чем в 2 раза быстрее , чем решение Poni (13 против 23 в моем тесте - тогда ваша первоначальная реализация составляет 22 с), тогда как при 128x128 это 30% медленнее (7 с против 10 с - ваш оригинал 10 с).

(Количество итераций было увеличено до 80000 для базового случая и до 800 для увеличенного в 100 раз случая 1280x1280.)

1 голос
/ 05 марта 2010

Я не специалист по этому вопросу, но я видел, что есть несколько научных статей по улучшению использования кэша метода Гаусса-Зейделя.

Другой возможной оптимизацией является использование красно-черного варианта, где точки обновляются в два цикла по шахматному образцу. Таким образом, все обновления в цикле независимы и могут быть распараллелены.

1 голос
/ 05 марта 2010

Я думаю, вы правы в том, что память является узким местом.Это довольно простой цикл с простой арифметикой на каждую итерацию.ic, iw, то есть is, и в индексах, кажется, на противоположных сторонах матрицы, поэтому я предполагаю, что там есть куча пропусков кэша.

0 голосов
/ 06 марта 2010

Я предлагаю добавить некоторые операторы предварительной выборки, а также исследовать «ориентированный на данные проект»:

void step_original() {
    size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
    float dw_ic, dx_ic, db_ic, de_ic, dn_ic, ds_ic;
    float dx_iw, dx_is, dx_ie, dx_in, de_ic, db_ic;
    for (size_t y = 1; y < d_ny - 1; ++y) {
        for (size_t x = 1; x < d_nx - 1; ++x) {
// Perform the prefetch
// Sorting these statements by array may increase speed;
//    although sorting by index name may increase speed too.
            db_ic = d_b[ic];
            dw_ic = d_w[ic];
            dx_iw = d_x[iw];
            de_ic = d_e[ic];
            dx_ie = d_x[ie];
            ds_ic = d_s[ic];
            dx_is = d_x[is];
            dn_ic = d_n[ic];
            dx_in = d_x[in];
// Calculate
            d_x[ic] = db_ic
                - dw_ic * dx_iw - de_ic * dx_ie
                - ds_ic * dx_is - dn_ic * dx_in;
            ++ic; ++iw; ++ie; ++is; ++in;
        }
        ic += 2; iw += 2; ie += 2; is += 2; in += 2;
    }
}

Это отличается от вашего второго метода, поскольку значения копируются в локальные временные переменные до выполнения вычисления.

...