Помогите с алгоритмом слияния векторов - PullRequest
3 голосов
/ 18 сентября 2008

Мне нужен очень быстрый алгоритм для следующей задачи. Я уже реализовал несколько алгоритмов, которые дополняют его, но они слишком медленные для нужной мне производительности. Он должен быть достаточно быстрым, чтобы алгоритм мог запускаться не менее 100 000 раз в секунду на современном процессоре. Это будет реализовано в C ++.

Я работаю с интервалами / диапазонами, структурой с начальной и конечной координатами на линии.

У меня есть два вектора (динамических массива) диапазонов, и мне нужно объединить их. Один вектор src, а другой dst. Векторы сортируются по начальным координатам пролета, и пролеты не перекрываются в пределах одного вектора.

Пролеты в векторе src должны быть объединены с пролетами в векторе dst, чтобы результирующий вектор все еще сортировался и не перекрывался. То есть. если во время объединения обнаружены перекрытия, два участка объединяются в один. (Объединение двух пролетов - это просто вопрос изменения координат в структуре.)

Теперь есть еще один улов: во время слияния интервалы в векторе src должны быть "расширены". Это означает, что константа будет добавлена ​​в начало, а другая (большая) константа в конечную координату каждого диапазона в src. Это означает, что после расширения диапазона src они могут перекрываться.


Я пришел к тому, что до сих пор это невозможно сделать полностью на месте, требуется какое-то временное хранилище. Я думаю, что это должно быть выполнимо за линейное время по числу элементов src и dst, суммированных.

Любое временное хранилище может быть совместно использовано несколькими прогонами алгоритма.

Два основных подхода, которые я пробовал, которые слишком медленны:

  1. Добавьте все элементы src к dst, расширяя каждый элемент перед добавлением его. Затем выполните сортировку на месте. Наконец, выполните итерацию по результирующему вектору, используя указатели «чтение» и «запись», с указателем чтения, опережающим указатель записи, объединяя интервалы по мере их прохождения. Когда все элементы объединены (указатель чтения достигает конца) dst усекается.

  2. Создать временный рабочий вектор. Выполните наивное объединение, как описано выше, многократно выбирая следующий элемент из src или dst и сливаясь с рабочим вектором. Когда закончите, скопируйте рабочий вектор в dst, заменив его.

Первый метод имеет проблему с сортировкой O ((m + n) * log (m + n)) вместо O (m + n) и имеет некоторые накладные расходы. Это также означает, что вектор dst должен вырасти намного больше, чем нужно.

У второй есть основная проблема - много копировать и снова выделять / освобождать память.

Структуры данных, используемые для хранения / управления диапазонами / векторами, могут быть изменены, если вы считаете, что это необходимо.

Обновление: забыл сказать, насколько велики наборы данных. Наиболее распространенные случаи - от 4 до 30 элементов в любом векторе, либо либо dst пусто, либо существует большое перекрытие между диапазонами в src и dst.

Ответы [ 9 ]

8 голосов
/ 18 сентября 2008

Мы знаем, что абсолютное время выполнения наилучшего случая - O (m + n), это связано с тем, что вам по крайней мере нужно просмотреть все данные, чтобы иметь возможность объединить списки. Учитывая это, ваш второй метод должен дать вам такой тип поведения.

Вы описали свой второй метод, чтобы выяснить, какие узкие места? Вполне возможно, что в зависимости от объема данных, о которых вы говорите, на самом деле невозможно сделать то, что вы хотите, в указанный промежуток времени. Один из способов проверить это - сделать что-то простое, например, суммировать все начальные и конечные значения промежутков в каждом векторе в цикле и рассчитать время. В основном, здесь вы выполняете минимальный объем работы для каждого элемента в векторах. Это даст вам базовую характеристику для наилучшей производительности, которую вы можете ожидать.

Кроме того, вы можете избежать копирования элемента vectors за элементом, используя метод swl swap, и вы можете предварительно выделить временный вектор до определенного размера, чтобы избежать запуска расширения массива при объединении элементов.

Вы можете рассмотреть возможность использования 2 векторов в вашей системе, и всякий раз, когда вам нужно выполнить слияние, вы сливаетесь с неиспользуемым вектором, а затем меняете местами (это похоже на двойную буферизацию, используемую в графике). Таким образом, вам не нужно перераспределять векторы каждый раз, когда вы выполняете слияние.

Однако лучше всего сначала профилировать и выяснить, какое у вас узкое место. Если выделения минимальны по сравнению с фактическим процессом слияния, вам нужно выяснить, как сделать это быстрее.

Некоторые возможные дополнительные ускорения могут возникнуть при непосредственном доступе к необработанным данным векторов, что позволяет избежать проверок границ при каждом доступе к данным.

0 голосов
/ 18 сентября 2008

Какая у вас целевая система? Это многоядерный? Если это так, вы можете рассмотреть многопоточность этого алгоритма

0 голосов
/ 18 сентября 2008

Я бы всегда сохранял сортировку вектора своих векторов. Это делает реализацию алгоритмов ОЧЕНЬ легкой и возможной за линейное время.

ОК, поэтому я бы отсортировал промежутки по:

  • минимальный интервал в порядке возрастания
  • затем максимальный диапазон в порядке убывания

Для этого вам нужно создать функцию.

Тогда я бы использовал std :: set_union для объединения векторов (вы можете объединить несколько, прежде чем продолжить).

Затем для каждого последовательного набора отрезков с одинаковыми минимумами вы сохраняете первый и удаляете остальные (они являются вспомогательными отрезками первого пролета).

Тогда вам нужно объединить ваши пролеты. Это должно быть вполне выполнимо сейчас и выполнимо в линейное время.

Хорошо, вот трюк. Не пытайтесь сделать это на месте. Используйте один или несколько временных векторов (и заранее зарезервируйте достаточно места). Затем в конце вызовите std :: vector :: swap, чтобы поместить результаты во входной вектор по вашему выбору.

Надеюсь, этого достаточно, чтобы вы пошли.

0 голосов
/ 18 сентября 2008

Я написал новый класс контейнера только для этого алгоритма, адаптированный к потребностям. Это также дало мне возможность настроить другой код в моей программе, который в то же время получил небольшой прирост скорости.

Это значительно быстрее, чем в старой реализации, использующей векторы STL, но в остальном это было в основном то же самое. Но, хотя он и быстрее, он все же недостаточно быстр ... к сожалению.

Профилирование больше не показывает, что является настоящим узким местом. Профилировщик MSVC, кажется, иногда возлагает «вину» на неправильные вызовы (предположительно, идентичные прогоны назначают совершенно разные времена выполнения), и большинство вызовов объединяются в одну большую чинку.

Глядя на дизассемблирование сгенерированного кода, видно, что в сгенерированном коде есть очень большое количество переходов, я думаю, что это может быть основной причиной медлительности сейчас.

class SpanBuffer {
private:
    int *data;
    size_t allocated_size;
    size_t count;

    inline void EnsureSpace()
    {
        if (count == allocated_size)
            Reserve(count*2);
    }

public:
    struct Span {
        int start, end;
    };

public:
    SpanBuffer()
        : data(0)
        , allocated_size(24)
        , count(0)
    {
        data = new int[allocated_size];
    }

    SpanBuffer(const SpanBuffer &src)
        : data(0)
        , allocated_size(src.allocated_size)
        , count(src.count)
    {
        data = new int[allocated_size];
        memcpy(data, src.data, sizeof(int)*count);
    }

    ~SpanBuffer()
    {
        delete [] data;
    }

    inline void AddIntersection(int x)
    {
        EnsureSpace();
        data[count++] = x;
    }

    inline void AddSpan(int s, int e)
    {
        assert((count & 1) == 0);
        assert(s >= 0);
        assert(e >= 0);
        EnsureSpace();
        data[count] = s;
        data[count+1] = e;
        count += 2;
    }

    inline void Clear()
    {
        count = 0;
    }

    inline size_t GetCount() const
    {
        return count;
    }

    inline int GetIntersection(size_t i) const
    {
        return data[i];
    }

    inline const Span * GetSpanIteratorBegin() const
    {
        assert((count & 1) == 0);
        return reinterpret_cast<const Span *>(data);
    }

    inline Span * GetSpanIteratorBegin()
    {
        assert((count & 1) == 0);
        return reinterpret_cast<Span *>(data);
    }

    inline const Span * GetSpanIteratorEnd() const
    {
        assert((count & 1) == 0);
        return reinterpret_cast<const Span *>(data+count);
    }

    inline Span * GetSpanIteratorEnd()
    {
        assert((count & 1) == 0);
        return reinterpret_cast<Span *>(data+count);
    }

    inline void MergeOrAddSpan(int s, int e)
    {
        assert((count & 1) == 0);
        assert(s >= 0);
        assert(e >= 0);

        if (count == 0)
        {
            AddSpan(s, e);
            return;
        }

        int *lastspan = data + count-2;

        if (s > lastspan[1])
        {
            AddSpan(s, e);
        }
        else
        {
            if (s < lastspan[0])
                lastspan[0] = s;
            if (e > lastspan[1])
                lastspan[1] = e;
        }
    }

    inline void Reserve(size_t minsize)
    {
        if (minsize <= allocated_size)
            return;

        int *newdata = new int[minsize];

        memcpy(newdata, data, sizeof(int)*count);

        delete [] data;
        data = newdata;

        allocated_size = minsize;
    }

    inline void SortIntersections()
    {
        assert((count & 1) == 0);
        std::sort(data, data+count, std::less<int>());
        assert((count & 1) == 0);
    }

    inline void Swap(SpanBuffer &other)
    {
        std::swap(data, other.data);
        std::swap(allocated_size, other.allocated_size);
        std::swap(count, other.count);
    }
};


struct ShapeWidener {
    // How much to widen in the X direction
    int widen_by;
    // Half of width difference of src and dst (width of the border being produced)
    int xofs;

    // Temporary storage for OverlayScanline, so it doesn't need to reallocate for each call
    SpanBuffer buffer;

    inline void OverlayScanline(const SpanBuffer &src, SpanBuffer &dst);

    ShapeWidener(int _xofs) : xofs(_xofs) { }
};


inline void ShapeWidener::OverlayScanline(const SpanBuffer &src, SpanBuffer &dst)
{
    if (src.GetCount() == 0) return;
    if (src.GetCount() + dst.GetCount() == 0) return;

    assert((src.GetCount() & 1) == 0);
    assert((dst.GetCount() & 1) == 0);

    assert(buffer.GetCount() == 0);

    dst.Swap(buffer);

    const int widen_s = xofs - widen_by;
    const int widen_e = xofs + widen_by;

    size_t resta = src.GetCount()/2;
    size_t restb = buffer.GetCount()/2;
    const SpanBuffer::Span *spa = src.GetSpanIteratorBegin();
    const SpanBuffer::Span *spb = buffer.GetSpanIteratorBegin();

    while (resta > 0 || restb > 0)
    {
        if (restb == 0)
        {
            dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
            --resta, ++spa;
        }
        else if (resta == 0)
        {
            dst.MergeOrAddSpan(spb->start, spb->end);
            --restb, ++spb;
        }
        else if (spa->start < spb->start)
        {
            dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
            --resta, ++spa;
        }
        else
        {
            dst.MergeOrAddSpan(spb->start, spb->end);
            --restb, ++spb;
        }
    }

    buffer.Clear();
}
0 голосов
/ 18 сентября 2008

Если ваша последняя реализация все еще не достаточно быстра, вам может понадобиться альтернативные подходы.

Для чего вы используете выходы этой функции?

0 голосов
/ 18 сентября 2008

1 прямо - полная сортировка медленнее, чем объединение двух отсортированных списков.

Итак, вы смотрите на тонкую настройку 2 (или что-то совершенно новое).

Если вы измените структуры данных на двусвязные списки, вы можете объединить их в постоянном рабочем пространстве.

Используйте распределитель кучи фиксированного размера для узлов списка, чтобы уменьшить использование памяти на узел и повысить вероятность того, что узлы находятся близко друг к другу в памяти, уменьшая количество пропусков страниц.

Возможно, вы сможете найти код в Интернете или в своей любимой книге алгоритмов, чтобы оптимизировать объединение связанных списков. Вы можете настроить это для того, чтобы объединить интервалы одновременно со слиянием списка.

Чтобы оптимизировать объединение, сначала обратите внимание, что для каждого прогона значений, приходящих с одной и той же стороны, но не с другой стороны, вы можете вставить весь прогон в список dst за один раз, вместо того, чтобы вставлять каждый узел по очереди. , И вы можете сохранить одну запись на вставку поверх обычной операции со списком, оставив конец «болтаться», зная, что вы исправите его позже. И при условии, что вы не делаете удаления где-либо еще в вашем приложении, список может быть односвязным, что означает одну запись на узел.

Что касается времени выполнения 10 микросекунд - вид зависит от n и m ...

0 голосов
/ 18 сентября 2008

я не думаю, что строго линейное решение возможно, потому что расширение векторов src может в худшем случае привести к тому, что все они перекрываются (в зависимости от величины добавляемой вами константы)

проблема может быть в реализации, а не в алгоритме; Я хотел бы предложить профилировать код для ваших предыдущих решений, чтобы увидеть, где тратится время

рассуждения:

для действительно «современного» процессора, такого как Intel Core 2 Extreme QX9770, работающего на частоте 3,2 ГГц, можно ожидать около 59 455 MIPS

для 100 000 векторов вы должны обработать каждый вектор в 594 550 экземплярах. Это очень много инструкций.

ref: Википедия MIPS

Кроме того, обратите внимание, что добавление константы к векторам src не отменяет их сортировку, поэтому вы можете независимо нормализовать векторы src, а затем объединить их с векторами dst; это должно уменьшить нагрузку на ваш оригинальный алгоритм

0 голосов
/ 18 сентября 2008

Сортировка, которую вы упоминаете в подходе 1, может быть уменьшена до линейного времени (от лог-линейного, как вы его описали), потому что два входных списка уже отсортированы. Просто выполните шаг объединения сортировки слиянием. С помощью соответствующего представления для входных векторов диапазона (например, односвязных списков) это можно сделать на месте.

http://en.wikipedia.org/wiki/Merge_sort

0 голосов
/ 18 сентября 2008

Как насчет второго метода без повторного выделения - другими словами, выделите ваш временный вектор один раз, и никогда не выделяйте его снова? Или, если входные векторы достаточно малы (но не имеют постоянного размера), просто используйте alloca вместо malloc.

Также, с точки зрения скорости, вы можете убедиться, что ваш код использует CMOV для сортировки, поскольку, если код фактически ветвится для каждой отдельной итерации слияния:

if(src1[x] < src2[x])
    dst[x] = src1[x];
else
    dst[x] = src2[x];

Прогнозирование ветвлений не будет выполнено в 50% случаев, что окажет огромное влияние на производительность. Условное перемещение, вероятно, будет намного лучше, поэтому убедитесь, что компилятор делает это, и если нет, попытайтесь убедить его сделать это.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...