Оптимизированный способ поиска M самых больших элементов в массиве NxN с использованием C ++ - PullRequest
7 голосов
/ 19 августа 2011

Мне нужен невероятно быстрый способ найти двумерные позиции и значения M самых больших элементов в массиве NxN.

сейчас я делаю это:

struct SourcePoint {
    Point point;
    float value;
}

SourcePoint* maxValues = new SourcePoint[ M ];

maxCoefficients = new SourcePoint*[
for (int j = 0; j < rows; j++) {
    for (int i = 0; i < cols; i++) {
        float sample = arr[i][j];
        if (sample > maxValues[0].value) {
            int q = 1;
            while ( sample > maxValues[q].value && q < M ) {
                maxValues[q-1] = maxValues[q];      // shuffle the values back
                 q++;
            }
            maxValues[q-1].value = sample;
            maxValues[q-1].point = Point(i,j);
        }
    }
}

Структура Point - это всего два целых числа - x и y.

Этот код в основном выполняет вставку сортировки поступающих значений. MaxValues ​​[0] всегда содержит SourcePoint с наименьшим значением, которое по-прежнему удерживает его в верхних значениях M, достигнутых до сих пор. Это дает нам быстрый и легкий выход из кризиса, если sample <= maxValues, мы ничего не делаем. Проблема, с которой я сталкиваюсь - это перетасовка каждый раз, когда обнаруживается новое лучшее значение. Он работает весь путь до maxValues, пока не найдет свое место, перетасовывая все элементы в maxValues, чтобы освободить место для себя. </p>

Я подхожу к тому, что готов рассмотреть SIMD-решения или оптимизацию кэша, так как похоже, что происходит довольно частая переработка кэша. Сокращение стоимости этой операции существенно повлияет на производительность моего общего алгоритма, так как он вызывается много раз и составляет 60-80% моей общей стоимости.

Я пытался использовать std :: vector и make_heap, но я думаю, что затраты на создание кучи перевесили экономию операций кучи. Это вероятно потому, что M и N обычно невелики. М обычно составляет 10-20 и N 10-30 (NxN 100-900). Проблема в том, что эта операция вызывается неоднократно и не может быть предварительно вычислена.

Я только что подумал о предварительной загрузке первых M элементов maxValues, которые могут обеспечить небольшую экономию. В текущем алгоритме гарантируется, что первые M элементов полностью перетасуют себя до первоначального заполнения maxValues.

Любая помощь от гуру оптимизации будет высоко ценится:)

Ответы [ 10 ]

5 голосов
/ 20 августа 2011

Несколько идей, которые вы можете попробовать. В некоторых быстрых тестах с N = 100 и M = 15 я смог получить его примерно на 25% быстрее в VC ++ 2010, но протестировал его сам, чтобы посмотреть, поможет ли какой-либо из них в вашем случае. Некоторые из этих изменений могут не иметь или даже иметь отрицательный эффект в зависимости от фактического использования / данных и оптимизации компилятора.

  • Не выделяйте новый массив maxValues каждый раз, если в этом нет необходимости. Использование переменной стека вместо динамического выделения дает мне +5%.
  • Изменение g_Source[i][j] на g_Source[j][i] дает вам немного (не так много, как я ожидал).
  • Использование структуры SourcePoint1, приведенной внизу, дает мне еще несколько процентов.
  • Наибольший прирост около + 15% состоял в замене локальной переменной sample на g_Source[j][i]. Компилятор, вероятно, достаточно умен, чтобы оптимизировать множественные чтения в массив, что он не может сделать, если вы используете локальную переменную.
  • Попытка простого бинарного поиска принесла мне небольшую потерю в несколько процентов. Для больших M / N вы, вероятно, увидите выгоду.
  • Если возможно, попытайтесь сохранить исходные данные в arr[][] отсортированном, даже если только частично. В идеале вы хотите генерировать maxValues[] одновременно с созданием исходных данных.
  • Посмотрите, как данные создаются / хранятся / организуются, могут дать вам шаблоны или информацию, чтобы сократить количество времени на создание вашего maxValues[] массива. Например, в лучшем случае вы могли бы придумать формулу, которая дает вам верхние координаты M без необходимости итерации и сортировки.

Код для выше:

struct SourcePoint1 {
     int x;
     int y;
     float value;
     int test;       //Play with manual/compiler padding if needed
};
4 голосов
/ 20 августа 2011

(обновлено 22:37 UTC 2011-08-20)

Я предлагаю двоичную минимальную кучу фиксированного размера, содержащую М самых больших элементов (но все еще в порядке минимальной кучи!). Вероятно, на практике это не будет быстрее, так как я думаю, что сортировка вставок OP, вероятно, имеет приличную производительность в реальном мире (по крайней мере, если учесть рекомендации других авторов в этой теме).

В случае сбоя поиск должен выполняться с постоянным временем: если текущий элемент меньше минимального элемента кучи (содержащего элементы max M), мы можем отклонить его сразу.

Если окажется, что у нас есть элемент больше, чем текущий минимум кучи (самый большой элемент M), мы извлекаем (отбрасываем) предыдущий минимум и вставляем новый элемент.

Если элементы нужны в отсортированном порядке, куча может быть отсортирована впоследствии.

Первая попытка минимальной реализации C ++:

template<unsigned size, typename T>
class m_heap {
private: 
    T nodes[size];
    static const unsigned last = size - 1;

    static unsigned parent(unsigned i) { return (i - 1) / 2; }
    static unsigned left(unsigned i) { return i * 2; }
    static unsigned right(unsigned i) { return i * 2 + 1; }

    void bubble_down(unsigned int i) {
        for (;;) { 
            unsigned j = i;
            if (left(i) < size && nodes[left(i)] < nodes[i])
                j = left(i);
            if (right(i) < size && nodes[right(i)] < nodes[j])
                j = right(i);
            if (i != j) {
                swap(nodes[i], nodes[j]);
                i = j;
            } else {
                break;
            }
        }
    }

    void bubble_up(unsigned i) {
        while (i > 0 && nodes[i] < nodes[parent(i)]) {
            swap(nodes[parent(i)], nodes[i]);
            i = parent(i);
        }
    }

public:
    m_heap() {
        for (unsigned i = 0; i < size; i++) {
            nodes[i] = numeric_limits<T>::min();
        }
    }

    void add(const T& x) {
        if (x < nodes[0]) {
            // reject outright 
            return;
        }
        nodes[0] = x;
        swap(nodes[0], nodes[last]);
        bubble_down(0);
    }
};

Небольшой тест / случай использования:

#include <iostream>
#include <limits>
#include <algorithm>
#include <vector>
#include <stdlib.h>
#include <assert.h>
#include <math.h>

using namespace std;

// INCLUDE TEMPLATED CLASS FROM ABOVE

typedef vector<float> vf;
bool compare(float a, float b) { return a > b; }

int main()
{
    int N = 2000;
    vf v;
    for (int i = 0; i < N; i++) v.push_back( rand()*1e6 / RAND_MAX);

    static const int M = 50;
    m_heap<M, float> h;
    for (int i = 0; i < N; i++) h.add( v[i] );

    sort(v.begin(), v.end(), compare);

    vf heap(h.get(), h.get() + M); // assume public in m_heap: T* get() { return nodes; }
    sort(heap.begin(), heap.end(), compare);

    cout << "Real\tFake" << endl;
    for (int i = 0; i < M; i++) {
        cout << v[i] << "\t" << heap[i] << endl;
        if (fabs(v[i] - heap[i]) > 1e-5) abort();
    }
}
4 голосов
/ 19 августа 2011

Если вы хотите начать микрооптимизацию на этом этапе, первым простым шагом должно быть избавление от Point s и просто объединение обоих измерений в единый тип int. Это уменьшает объем данных, которые вам нужно перемещать, и превращают SourcePoint в степень двух длин, что упрощает индексацию в нем.

Кроме того, вы уверены, что хранить отсортированный список лучше, чем просто пересчитывать, какой элемент является новым самым низким после каждого сдвига старого самого низкого?

2 голосов
/ 19 августа 2011

Быстрая оптимизация состояла бы в добавлении значения часового в ваш массив maxValues. Если у вас maxValues[M].value равно std::numeric_limits<float>::max(), вы можете отменить тест q < M в вашем цикле while.

2 голосов
/ 19 августа 2011

Вы ищете очередь с приоритетами :

template < class T, class Container = vector<T>,
       class Compare = less<typename Container::value_type> > 
       class priority_queue;

Вам нужно будет определить лучший нижележащий контейнер для использования и, вероятно, определить функцию Compare дляиметь дело с вашим типом Point.

Если вы хотите оптимизировать его, вы можете запустить очередь для каждой строки вашей матрицы в своем собственном рабочем потоке, а затем запустить алгоритм, чтобы выбрать самый большой элемент очередифронты, пока у вас есть ваши элементы М.

1 голос
/ 20 августа 2011

Прежде всего, вы перемещаетесь по массиву в неправильном порядке!

Вы всегда, всегда, всегда хотите сканировать память линейно.Это означает, что последний индекс вашего массива должен меняться быстрее всего.Поэтому вместо этого:

for (int j = 0; j < rows; j++) {
    for (int i = 0; i < cols; i++) {
        float sample = arr[i][j];

Попробуйте это:

for (int i = 0; i < cols; i++) {
    for (int j = 0; j < rows; j++) {
        float sample = arr[i][j];

Я предсказываю, что это будет иметь большее значение, чем любое другое отдельное изменение.

Далее, я быиспользуйте кучу вместо отсортированного массива.Стандартный заголовок <algorithm> уже имеет функции push_heap и pop_heap для использования вектора в качестве кучи.(Однако, вероятно, это не очень поможет, если только M не достаточно велик. Для небольших M и рандомизированного массива вы не будете выполнять все эти многочисленные вставки в среднем ... Что-то вроде O (logN) Я верю.)

Далее следует использовать SSE2.Но это арахис по сравнению с маршем по памяти в правильном порядке.

1 голос
/ 20 августа 2011

Одной из идей было бы использование алгоритма std::partial_sort для простой одномерной последовательности ссылок в ваш массив NxN.Вы также можете кэшировать эту последовательность ссылок для последующих вызовов.Я не знаю, насколько хорошо он работает, но стоит попробовать - если он работает достаточно хорошо, у вас не так много "магии".В частности, вы не прибегаете к микрооптимизации.

Рассмотрите эту витрину:

#include <algorithm>
#include <iostream>
#include <vector>

#include <stddef.h>

static const int M = 15;
static const int N = 20;

// Represents a reference to a sample of some two-dimensional array
class Sample
{
public:
    Sample( float *arr, size_t row, size_t col )
        : m_arr( arr ),
        m_row( row ),
        m_col( col )
    {
    }

    inline operator float() const {
        return m_arr[m_row * N + m_col];
    }

    bool operator<( const Sample &rhs ) const {
        return (float)other < (float)*this;
    }

    int row() const {
        return m_row;
    }

    int col() const {
        return m_col;
    }

private:
    float *m_arr;
    size_t m_row;
    size_t m_col;
};

int main()
{
    // Setup a demo array
    float arr[N][N];
    memset( arr, 0, sizeof( arr ) );

    // Put in some sample values
    arr[2][1] = 5.0;
    arr[9][11] = 2.0;
    arr[5][4] = 4.0;
    arr[15][7] = 3.0;
    arr[12][19] = 1.0;

    //  Setup the sequence of references into this array; you could keep
    // a copy of this sequence around to reuse it later, I think.
    std::vector<Sample> samples;
    samples.reserve( N * N );
    for ( size_t row = 0; row < N; ++row ) {
        for ( size_t col = 0; col < N; ++col ) {
            samples.push_back( Sample( (float *)arr, row, col ) );
        }
    }

    // Let partial_sort find the M largest entry
    std::partial_sort( samples.begin(), samples.begin() + M, samples.end() );

    // Print out the row/column of the M largest entries.
    for ( std::vector<Sample>::size_type i = 0; i < M; ++i ) {
        std::cout << "#" << (i + 1) << " is " << (float)samples[i] << " at " << samples[i].row() << "/" << samples[i].col() << std::endl;
    }
}
0 голосов
/ 22 августа 2011

Я попытался заменить float на double, и, что интересно, это дало мне повышение скорости примерно на 20% (при использовании VC ++ 2008). Это немного нелогично, но кажется, что современные процессоры или компиляторы оптимизированы для обработки двойных значений.

0 голосов
/ 20 августа 2011

Используйте связанный список для хранения лучших пока значений М.Вам все равно придется перебирать его, чтобы найти правильное место, но вставка O (1).Возможно, это даже лучше, чем бинарный поиск и вставка O (N) + O (1) против O (lg (n)) + O (N).Поменяйте местами форсы, чтобы вы не обращались к каждому элементу N в памяти и не разрушали кеш.


LE: Создание другой идеи, которая может работать для равномерно распределенных значений.
Найти минимум, максимум в 3/2 * O (N ^ 2) сравнениях.
Создать в любом месте от N до N ^ 2 равномерно распределенных сегментов, предпочтительно ближе к N ^ 2, чем N.
Для каждого элемента в матрице NxN поместите его в сегмент [(int) (значение-мин) / диапазон],range = max-min.
Наконец, создайте набор, начиная с самого верхнего сегмента до самого нижнего, добавляйте в него элементы из других сегментов, пока | текущий набор |+ | следующее ведро |<= M. <br>Если вы получили М элементов, все готово.Скорее всего, вы получите меньше элементов, чем M, скажем P.
Примените свой алгоритм для оставшегося сегмента и извлеките из него самые большие элементы MP.
Если элементы однородны и вы используете N ^ 2 сегментов, сложностьоколо 3,5 * (N ^ 2) против вашего текущего решения, которое составляет около O (N ^ 2) * ln (M).

0 голосов
/ 19 августа 2011

Вы сможете добиться почти линейного ускорения при параллельной обработке.

С N ЦП вы можете обрабатывать полосу rows/N строк (и всех столбцов) с каждым ЦП, находя верхнюю частьM записей в каждой группе.А затем выполните сортировку выбора, чтобы найти общий верх M.

. Вы, вероятно, могли бы сделать это и с SIMD (но здесь вы бы поделили задачу, чередуя столбцы вместо разделения строк).Не пытайтесь заставить SIMD выполнять сортировку вставок быстрее, заставляйте делать больше сортировок вставок одновременно, которые вы объединяете в конце, используя один очень быстрый шаг.

Естественно, вы можете выполнять как многопоточность, так иSIMD, но для проблемы, которая только 30x30, это вряд ли стоит.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...