Оптимизация поиска ближайшего значения в списке значений - PullRequest
1 голос
/ 16 октября 2019

Я ценю, что подобные вопросы задавались ниже, но ни один из тех, что я видел, не превышал десятки тысяч строк. Они, хотя и «большие», относительно тривиальны, даже если код довольно медленный. В моем случае я имею дело с миллионами и сотнями миллионов строк, поэтому даже небольшая оптимизация может оказаться весьма полезной.

Ответы, на которые я до сих пор опирался, - здесь и здесь . Обновления, которые я сделал, не удивительны ...

Для широкого обзора у меня есть справочный файл приблизительно из 2 000 000 строк (формат easting-northing-value), из которого я ищу наиболее близкое значение к ProcessedLineData (я не могу гарантировать, что точное совпадение существует в моем списке, поэтому мне нужно вычислять по всему списку для каждого экземпляра, чтобы найти наиболее близкую точку, которая работает).

Первый вариант, который у меня был, который был невероятномедленно, было:

AsciiFile.Sample _closestSample = Modifications.Value_and_Ref_Calc.ReferenceFile.Dataset[0].Data
                            .OrderBy(t => Public.CalculateDistance(t.Easting, t.Northing, ProcessedLineData.Easting, ProcessedLineData.Northing))
                            .First();

После этого я подумал, что Aggregate второго примера будет работать лучше, поэтому я пошел на это:

AsciiFile.Sample _closestSample = Modifications.Value_and_Ref_Calc.ReferenceFile.Dataset[0].Data
                            .Aggregate((x, y) => 
                              Public.CalculateDistance(x.Easting, x.Northing, ProcessedLineData.Easting, ProcessedLineData.Northing) 
                              < 
                              Public.CalculateDistance(y.Easting, y.Northing, ProcessedLineData.Easting, ProcessedLineData.Northing) 
                              ? x : y);

Это все еще довольно медленно (довольнотак же, как и предыдущий). Затем я подумал об использовании ToLookup(), который также описан в одном из примеров, но я не видел, какие конкретные преимущества это принесет.

В случае, если это уместно, Public.CalculateDistance будет:

public static double CalculateDistance(double _x1, double _y1, double _x2, double _y2)
        {
            return Math.Sqrt(Math.Pow(_x1 - _x2, 2) + Math.Pow(_y1 - _y2, 2));
        }

В этих примерах Modifications.Value_and_Ref_Calc.ReferenceFile.Dataset[0].Data - мой список из 2 000 000 контрольных точек.

Это было бы менее важно, если бы у меня не было сотен миллионов ProcessedLineData точек, к которым я нахожу самые близкие значения (ну, многочисленные файлы от маленьких с десятками тысяч точек до большихс десятками миллионов точек) ...

Конечная цель: я нахожу ближайшее значение, чтобы иметь возможность использовать высоту, связанную с конкретным Modifications.Value_and_Ref_Calc.ReferenceFile.Dataset[0].Data, и использовать его для изменения значенияпо моему ProcessedLineData. Самая медленная часть всей этой последовательности - это, безусловно, поиск моих ближайших ценностей.

Существуют ли очевидные способы оптимизации этого кода, который мне не хватает?

Ответы [ 2 ]

2 голосов
/ 17 октября 2019

Не зная ничего больше о диапазоне и точности значений или слишком много думая о распределении поисков по сравнению с изменениями списка контрольных точек, некоторые простые оптимизации цикла for дают примерно 30-кратное ускорение для 100 поисков по сравнению сбыстрее OrderBy / First код:

Используя pld для ваших ProcessedLineData и data для Modifications.Value_and_Ref_Calc.ReferenceFile.Dataset[0].Data вы получите:

var _closestSample = data[0];
var dist = (_closestSample.Easting - pld.Easting) * (_closestSample.Easting - pld.Easting) + (_closestSample.Northing - pld.Northing) * (_closestSample.Northing - pld.Northing);
for (int j2 = 1; j2 < data.Count; ++j2) {
    var y = data[j2];
    var ydist = (y.Easting - pld.Easting) * (y.Easting - pld.Easting) + (y.Northing - pld.Northing) * (y.Northing - pld.Northing);
    if (ydist < dist) {
        dist = ydist;
        _closestSample = y;
    }
}

С моим временем более2 000 000 записей data списка и 100 поисков, OrderBy / First занимает 2,22 секунды, а for занимает 0,06 секунды для ускорения в 32 раза.

Итак, я был уверен, что есть лучший способчем грубой силой, и после некоторых исследований обнаружили коды Мортона и кривые Гильберта . Небольшая работа сгенерировала класс SpatialIndex с использованием кривой Гильберта и класс SpatialIndexMorton с использованием индекса Мортона. Я также настроил индекс Гильберта, чтобы индексировать только биты 16-32, что обеспечивало оптимальный поиск в секунду. По моим данным, кривая Мортона теперь несколько быстрее.

Используя те же тесты случайных данных, я получаю, что метод for может выполнять 147 просмотров в секунду, поиск по Гильберту с индексом 5634 в секунду и МортонаИндексируйте 7370 поисков в секунду более 10 000 поисков с 2 000 000 контрольных точек. Обратите внимание, что пространственные индексы имеют время установки около 3 секунд, поэтому для очень небольшого числа поисков быстрее использовать грубую силу с for - я получаю время безубыточности около 468 поисков.

Чтобы сделать это (несколько) общего назначения, я начал с (C # 8.0) интерфейса для координат Земли, который предоставляет несколько вспомогательных методов:

public interface ICoordinate {
    double Longitude { get; set; }
    double Latitude { get; set; }

    public ulong MortonCode() {
        float f = (float)Latitude;
        uint ui;
        unsafe { // perform unsafe cast (preserving raw binary)
            float* fRef = &f;
            ui = *((uint*)fRef);
        }
        ulong ixl = ui;

        f = (float)Longitude;
        unsafe { // perform unsafe cast (preserving raw binary)
            float* fRef = &f;
            ui = *((uint*)fRef);
        }
        ulong iyl = ui;

        ixl = (ixl | (ixl << 16)) & 0x0000ffff0000ffffL;
        iyl = (iyl | (iyl << 16)) & 0x0000ffff0000ffffL;

        ixl = (ixl | (ixl << 8)) & 0x00ff00ff00ff00ffL;
        iyl = (iyl | (iyl << 8)) & 0x00ff00ff00ff00ffL;

        ixl = (ixl | (ixl << 4)) & 0x0f0f0f0f0f0f0f0fL;
        iyl = (iyl | (iyl << 4)) & 0x0f0f0f0f0f0f0f0fL;

        ixl = (ixl | (ixl << 2)) & 0x3333333333333333L;
        iyl = (iyl | (iyl << 2)) & 0x3333333333333333L;

        ixl = (ixl | (ixl << 1)) & 0x5555555555555555L;
        iyl = (iyl | (iyl << 1)) & 0x5555555555555555L;

        return ixl | (iyl << 1);
    }

    const int StartBitMinus1 = 31;
    const int EndBit = 16;

    //convert (x,y) to 31-bit Hilbert Index
    public ulong HilbertIndex() {
        float f = (float)Latitude;
        uint x;
        unsafe { // perform unsafe cast (preserving raw binary)
            float* fRef = &f;
            x = *((uint*)fRef);
        }

        f = (float)Longitude;
        uint y;
        unsafe { // perform unsafe cast (preserving raw binary)
            float* fRef = &f;
            y = *((uint*)fRef);
        }

        ulong hi = 0;
        for (int bitpos = StartBitMinus1; bitpos >= EndBit; --bitpos) {
            // extract s'th bit from x & y
            var rx = (x >> bitpos) & 1;
            var ry = (y >> bitpos) & 1;
            hi <<= 2;
            hi += (rx << 1) + (rx ^ ry);

            //rotate/flip a quadrant appropriately
            if (ry == 0) {
                if (rx == 1) {
                    x = ~x;
                    y = ~y;
                }

                //Swap x and y
                uint t = x;
                x = y;
                y = t;
            }
        }
        return hi;
    }
    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    public double DistanceTo(ICoordinate b) =>
        Math.Sqrt((Longitude - b.Longitude) * (Longitude - b.Longitude) + (Latitude - b.Latitude) * (Latitude - b.Latitude));
    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    public double Distance2To(ICoordinate b) => (Longitude - b.Longitude) * (Longitude - b.Longitude) + (Latitude - b.Latitude) * (Latitude - b.Latitude);

    public ICoordinate MakeNew(double plat, double plong);
}

public static class ICoordinateExt {
    public static ICoordinate Minus(this ICoordinate a, ICoordinate b) =>
        a.MakeNew(a.Latitude - b.Latitude, a.Longitude - b.Longitude);
    public static ICoordinate Plus(this ICoordinate a, ICoordinate b) =>
        a.MakeNew(a.Latitude + b.Latitude, a.Longitude + b.Longitude);
}

Затем реальный класс дляреализовать интерфейс (который будет заменен вашим реальным классом):

public class PointOfInterest : ICoordinate {
    public double Longitude { get; set; }
    public double Latitude { get; set; }

    public PointOfInterest(double plat, double plong) {
        Latitude = plat;
        Longitude = plong;
    }

    public ICoordinate MakeNew(double plat, double plong) => new PointOfInterest(plat, plong);
}

И класс для преобразования IEnumerable<ICoordinate> в пространственно индексированную коллекцию ICoordinate с использованием кривой Гильберта:

public class SpatialIndex {
    SortedList<ulong, List<ICoordinate>> orderedData;
    List<ulong> orderedIndexes;

    public SpatialIndex(IEnumerable<ICoordinate> data) {
        orderedData = data.GroupBy(d => d.HilbertIndex()).ToSortedList(g => g.Key, g => g.ToList());
        orderedIndexes = orderedData.Keys.ToList();
    }

    public ICoordinate FindNearest(ICoordinate aPoint) {
        var hi = aPoint.HilbertIndex();
        var nearestIndex = orderedIndexes.FindNearestIndex(hi);
        var nearestGuess = orderedData.Values[nearestIndex][0];
        var guessDist = (nearestGuess.Longitude - aPoint.Longitude) * (nearestGuess.Longitude - aPoint.Longitude) + (nearestGuess.Latitude - aPoint.Latitude) * (nearestGuess.Latitude - aPoint.Latitude);
        if (nearestIndex > 0) {
            var tryGuess = orderedData.Values[nearestIndex-1][0];
            var tryDist = (tryGuess.Longitude - aPoint.Longitude) * (tryGuess.Longitude - aPoint.Longitude) + (tryGuess.Latitude - aPoint.Latitude) * (tryGuess.Latitude - aPoint.Latitude);
            if (tryDist < guessDist) {
                nearestGuess = tryGuess;
                guessDist = tryDist;
            }
        }

        var offsetPOI = new PointOfInterest(guessDist, guessDist);
        var minhi = (aPoint.Minus(offsetPOI)).HilbertIndex();
        var minhii = orderedIndexes.FindNearestIndex(minhi);
        if (minhii > 0)
            --minhii;
        var maxhi = (aPoint.Plus(offsetPOI)).HilbertIndex();
        var maxhii = orderedIndexes.FindNearestIndex(maxhi);
        for (int j2 = minhii; j2 < maxhii; ++j2) {
            var tryList = orderedData.Values[j2];
            for (int j3 = 0; j3 < tryList.Count; ++j3) {
                var y = tryList[j3];
                var ydist = (y.Longitude - aPoint.Longitude) * (y.Longitude - aPoint.Longitude) + (y.Latitude - aPoint.Latitude) * (y.Latitude - aPoint.Latitude);
                if (ydist < guessDist) {
                    nearestGuess = y;
                    guessDist = ydist;
                }
            }
        }

        return nearestGuess;
    }
}

И подобный класс, использующий Кривую Мортона:

public class SpatialIndexMorton {
    SortedList<ulong, List<ICoordinate>> orderedData;
    List<ulong> orderedIndexes;

    public SpatialIndexMorton(IEnumerable<ICoordinate> data) {
        orderedData = data.GroupBy(d => d.MortonCode()).ToSortedList(g => g.Key, g => g.ToList());
        orderedIndexes = orderedData.Keys.ToList();
    }

    public ICoordinate FindNearest(ICoordinate aPoint) {
        var mc = aPoint.MortonCode();
        var nearestIndex = orderedIndexes.FindNearestIndex(mc);
        var nearestGuess = orderedData.Values[nearestIndex][0];
        var guessDist = (nearestGuess.Longitude - aPoint.Longitude) * (nearestGuess.Longitude - aPoint.Longitude) + (nearestGuess.Latitude - aPoint.Latitude) * (nearestGuess.Latitude - aPoint.Latitude);
        if (nearestIndex > 0) {
            var tryGuess = orderedData.Values[nearestIndex-1][0];
            var tryDist = (tryGuess.Longitude - aPoint.Longitude) * (tryGuess.Longitude - aPoint.Longitude) + (tryGuess.Latitude - aPoint.Latitude) * (tryGuess.Latitude - aPoint.Latitude);
            if (tryDist < guessDist) {
                nearestGuess = tryGuess;
                guessDist = tryDist;
            }
        }

        var offsetPOI = new PointOfInterest(guessDist, guessDist);
        var minmc = (aPoint.Minus(offsetPOI)).MortonCode();
        var minmci = orderedIndexes.FindNearestIndex(minmc);
        if (minmci > 0)
            --minmci;
        var maxmc = (aPoint.Plus(offsetPOI)).MortonCode();
        var maxmci = orderedIndexes.FindNearestIndex(maxmc);
        for (int j2 = minmci; j2 < maxmci; ++j2) {
            var tryList = orderedData.Values[j2];
            for (int j3 = 0; j3 < tryList.Count; ++j3) {
                var y = tryList[j3];
                var ydist = (y.Longitude - aPoint.Longitude) * (y.Longitude - aPoint.Longitude) + (y.Latitude - aPoint.Latitude) * (y.Latitude - aPoint.Latitude);
                if (ydist < guessDist) {
                    nearestGuess = y;
                    guessDist = ydist;
                }
            }
        }

        return nearestGuess;
    }
}

И некоторые вспомогательные методы расширения:

public static class ListExt {
    public static int FindNearestIndex<T>(this List<T> l, T possibleKey) {
        var keyIndex = l.BinarySearch(possibleKey);
        if (keyIndex < 0) {
            keyIndex = ~keyIndex;
            if (keyIndex == l.Count)
                keyIndex = l.Count - 1;
        }
        return keyIndex;
    }
}

public static class IEnumerableExt {
    public static SortedList<TKey, TValue> ToSortedList<T, TKey, TValue>(this IEnumerable<T> src, Func<T, TKey> keySelector, Func<T, TValue> valueSelector) =>
        new SortedList<TKey, TValue>(src.ToDictionary(keySelector, valueSelector));    
}

Наконец, некоторые примеры кода, использующие это, с вашими ссылками вdata, и ваши значения поиска в plds:

var hilbertIndex = new SpatialIndex(data);
var ans = new (ICoordinate, ICoordinate)[lookups];
for (int j1 = 0; j1 < lookups; ++j1) {
    ICoordinate pld = plds[j1];
    ans[j1] = (pld, hilbertIndex.FindNearest(pld));
}

ОБНОВЛЕНИЕ: я изменил алгоритм поиска ближайшего, чтобы он брал самую близкую точку выше и ниже целевой точки индекса вместо простой попыткитот, что выше. Это обеспечивает еще одно приличное ускорение.

0 голосов
/ 22 октября 2019

Вы можете попробовать уменьшить диапазон каждого поиска, используя сетку, чтобы разделить плоскость на равные квадраты. Каждый элемент будет храниться в ведре соответствующего квадрата. Затем вы можете использовать эту сетку для выполнения поиска, начиная с квадрата, содержащего искомую точку, и по спирали наружу, пока не найдете один или несколько заполненных квадратов по периметру спирали. Это наивный алгоритм, но он может работать на удивление хорошо при определенных условиях:

  1. Распределение элементов на плоскости должно быть случайным (или близким к случайному). Если большинство элементов скопилось в нескольких населенных пунктах, производительность пострадает.

  2. Найденные точки будут находиться в общей населенной области карты. Поиск далеко-далеко или внутри обширных незаселенных районов может занять вечность.

Важно правильно настроить размер квадратов сетки. Слишком большие или слишком маленькие квадраты могут также повредить производительность. Слишком маленькие квадраты также увеличат потребление памяти, потому что каждый заполненный квадрат требует своего собственного сегмента. В идеальных условиях алгоритм может работать более чем в 1000 раз быстрее по сравнению с простым циклом.

public class SpatialDictionary<T> : IEnumerable<T>
{
    private readonly Dictionary<(int, int), List<T>> _dictionary;
    private readonly double _squareSize;
    private readonly Func<T, (double, double)> _locationSelector;
    private int _count;

    public int Count => _count;

    public SpatialDictionary(
        double squareSize, Func<T, (double, double)> locationSelector)
    {
        if (squareSize <= 0)
            throw new ArgumentOutOfRangeException(nameof(squareSize));
        _squareSize = squareSize;
        _locationSelector = locationSelector
            ?? throw new ArgumentNullException(nameof(locationSelector));
        _dictionary = new Dictionary<(int, int), List<T>>();
    }

    public void Add(T item)
    {
        var (itemX, itemY) = _locationSelector(item);
        int keyX = checked((int)(itemX / _squareSize));
        int keyY = checked((int)(itemY / _squareSize));
        if (!_dictionary.TryGetValue((keyX, keyY), out var bucket))
        {
            bucket = new List<T>(1);
            _dictionary.Add((keyX, keyY), bucket);
        }
        bucket.Add(item);
        _count++;
    }

    public T FindClosest(double x, double y)
    {
        if (_count == 0) throw new InvalidOperationException();
        int keyX = checked((int)(x / _squareSize));
        int keyY = checked((int)(y / _squareSize));
        double minDistance = Double.PositiveInfinity;
        T minItem = default;
        int radius = 0;
        while (true)
        {
            checked { radius++; }
            foreach (var square in GetSquares(keyX, keyY, radius))
            {
                if (!_dictionary.TryGetValue(square, out var bucket)) continue;
                foreach (var item in bucket)
                {
                    var (itemX, itemY) = _locationSelector(item);
                    var distX = x - itemX;
                    var distY = y - itemY;
                    var distance = Math.Abs(distX * distX + distY * distY);
                    if (distance < minDistance)
                    {
                        minDistance = distance;
                        minItem = item;
                    }
                }
            }
            if (minDistance != Double.PositiveInfinity) return minItem;
        }
    }

    private IEnumerable<(int, int)> GetSquares(int x, int y, int radius)
    {
        if (radius == 1) yield return (x, y);
        for (int i = -radius; i < radius; i++)
        {
            yield return checked((x + i, y + radius));
            yield return checked((x - i, y - radius));
            yield return checked((x + radius, y - i));
            yield return checked((x - radius, y + i));
        }
    }

    public IEnumerator<T> GetEnumerator()
        => _dictionary.Values.SelectMany(b => b).GetEnumerator();

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

Пример использования:

var spatialData = new SpatialDictionary<DataRow>(squareSize: 10.0,
    dr => (dr.Field<double>("Easting"), dr.Field<double>("Northing")));

foreach (DataRow dataRow in dataTable.Rows)
{
    spatialData.Add(dataRow);
}

DataRow result = spatialData.FindClosest(100.0, 100.0);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...