Учитывая два списка 2-х точек, как найти ближайшую точку во 2-м списке для каждой точки в 1-м списке? - PullRequest
1 голос
/ 16 июня 2020

У меня есть два больших numpy массива случайно отсортированных 2d точек, скажем, это A и B. Мне нужно найти количество «совпадений» между двумя массивами, где совпадение - это точка в A (назовите его A '), находящимся в пределах некоторого заданного радиуса R с точкой в ​​B (назовите его B'). Это означает, что каждая точка в A должна соответствовать либо 1, либо ни одной точке в B. Также было бы неплохо вернуть индексы списка совпадений между двумя массивами, однако в этом нет необходимости. Поскольку в этом радиусе R может быть много точек, кажется, лучше найти точку, ближайшую к A 'в B, а затем проверить, находится ли она в пределах радиуса R. Это проверяется просто с помощью формулы расстояния dx^2 + dy^2. Очевидно, существует решение грубой силы O (n ^ 2) для перебора обоих массивов, но мне нужно что-то более быстрое, надеюсь, O (n log n).

Я видел, что диаграмма Вороного может быть используется для такой проблемы, однако я не уверен, как это будет реализовано. Я не знаком с диаграммами Вороного, поэтому генерирую их с помощью scipy.spatial.Voronoi. Есть ли быстрый алгоритм решения этой проблемы с использованием этих диаграмм или есть другой?

Ответы [ 2 ]

1 голос
/ 16 июня 2020

Думаю, есть несколько вариантов. Я провел небольшой сравнительный тест, чтобы изучить некоторые из них. Первая пара из них - всего go, так как выясняется, сколько точек взаимно находятся в пределах радиуса друг друга, чтобы убедиться, что я получаю стабильные результаты по основной части проблемы. Он не отвечает на почту, касающуюся вашей проблемы с поиском ближайшего, что, я думаю, потребовало бы немного большей работы над некоторыми из них - сделал это для последнего варианта, см. Нижнюю часть сообщения. Драйвер проблемы выполняет все сравнения, и я думаю, вы можете сделать немного сена с помощью некоторой сортировки (последнее понятие здесь), чтобы ограничить сравнения.

Наивный Python

Используйте грубый принудительное двухточечное сравнение. Ясно, что O (n ^ 2).

Scipy's cdist module

Отлично и быстро работает с "небольшими" данными. При больших объемах данных это начинает разрушаться из-за размера вывода матрицы в памяти. Вероятно, невозможно для приложения размером 1M x 1M.

Scipy's KDTree module

Из другого решения. Быстро, но не так быстро, как cdist или «секционирование» (ниже). Возможно, есть другой способ использовать KDTree для этой задачи ... Я не очень разбираюсь в этом. Этот подход (ниже) казался логичным.

Секционирование массива сравнения

Это работает очень хорошо, потому что вас не интересуют все расстояний, вы просто хотите те, которые находятся в радиусе. Таким образом, сортируя целевой массив и просматривая только прямоугольное angular окно вокруг него в поисках «претендентов», вы можете получить очень высокую производительность с собственным python и без «взрыва памяти». Вероятно, все еще немного «оставлено на столе» для улучшения, возможно, путем встраивания cdist в эту реализацию или (глоток), пытаясь его многопоточность.

Другие идеи ...

Это жесткая "математика" l oop, поэтому попробовать что-то в cython или разделить один из массивов и использовать многопоточность было бы в новинку. И обработка результата, чтобы вам не приходилось запускать это, часто кажется благоразумным.

Я думаю, что вы можете легко дополнить кортежи индексом внутри массива, чтобы получить список совпадений.

Моя старая iMa c делает 100K x 100K за 90 секунд с помощью секционирования, так что это не сулит ничего хорошего для 1M x 1M

Сравнение:

# distance checker

from random import uniform
import time
import numpy as np
from scipy.spatial import distance, KDTree
from bisect import bisect
from operator import itemgetter
import sys
from matplotlib import pyplot as plt
sizes = [100, 500, 1000, 2000, 5000, 10000, 20000]
#sizes = [20_000, 30_000, 40_000, 50_000, 60_000]   # for the playoffs.  :)
naive_times = []
cdist_times = []
kdtree_times = []
sectioned_times = []
delta = 0.1

for size in sizes:
    print(f'\n *** running test with vectors of size {size} ***')
    r = 20  # radius to match
    r_squared = r**2

    A = [(uniform(-1000,1000), uniform(-1000,1000)) for t in range(size)]
    B = [(uniform(-1000,1000), uniform(-1000,1000)) for t in range(size)]

    # naive python
    print('naive python')
    tic = time.time()
    matches = [(p1, p2) for p1 in A
                        for p2 in B
                        if (p1[0] - p2[0])**2 + (p1[1] - p2[1])**2 <= r_squared]

    toc = time.time()
    print(f'found: {len(matches)}')
    naive_times.append(toc-tic)
    print(toc-tic)
    print()

    # using cdist module
    print('cdist')
    tic = time.time()
    dist_matrix = distance.cdist(A, B, 'euclidean')
    result = np.count_nonzero(dist_matrix<=r)
    toc = time.time()
    print(f'found: {result}')
    cdist_times.append(toc-tic)
    print(toc-tic)
    print()

    # KDTree
    print('KDTree')
    tic = time.time()
    my_tree = KDTree(A)
    results = my_tree.query_ball_point(B, r=r)
    # for count, r in enumerate(results):
    #   for t in r:
    #       print(count, A[t])

    result = sum(len(lis) for lis in results)
    toc = time.time()
    print(f'found: {result}')
    kdtree_times.append(toc-tic)
    print(toc-tic)
    print()

    # python with sort and sectioning
    print('with sort and sectioning')
    result = 0
    tic = time.time()
    B.sort()
    for point in A:
        # gather the neighborhood in x-dimension within x-r <= x <= x+r+1
        # if this has any merit, we could "do it again" for y-coord....
        contenders = B[bisect(B,(point[0]-r-delta, 0)) : bisect(B,(point[0]+r+delta, 0))]
        # further chop down to the y-neighborhood
        # flip the coordinate to support bisection by y-value
        contenders = list(map(lambda p: (p[1], p[0]), contenders))
        contenders.sort()
        contenders = contenders[bisect(contenders,(point[1]-r-delta, 0)) : 
                                bisect(contenders,(point[1]+r+delta, 0))]
        # note (x, y) in contenders is still inverted, so need to index properly
        matches = [(point, p2) for p2 in contenders if (point[0] - p2[1])**2 + (point[1] - p2[0])**2 <= r_squared]
        result += len(matches)
    toc = time.time()
    print(f'found: {result}')
    sectioned_times.append(toc-tic)
    print(toc-tic)
print('complete.')

plt.plot(sizes, naive_times, label = 'naive')
plt.plot(sizes, cdist_times, label = 'cdist')
plt.plot(sizes, kdtree_times, label = 'kdtree')
plt.plot(sizes, sectioned_times, label = 'sectioning')
plt.legend()
plt.show()

Результаты для один из размеров и графиков:

 *** running test with vectors of size 20000 ***
naive python
found: 124425
101.40657806396484

cdist
found: 124425
2.9293079376220703

KDTree
found: 124425
18.166933059692383

with sort and sectioning
found: 124425
2.3414530754089355
complete.

Примечание: на первом графике cdist накладывается на sectioning. Плей-офф показаны на втором графике. smaller sizes

«Плей-офф»

The playoffs

Измененный код секционирования

Это код находит минимум внутри точек в радиусе. Время выполнения эквивалентно приведенному выше коду секционирования.

print('with sort and sectioning, and min finding')
result = 0
pairings = {}  
tic = time.time()
B.sort()
def dist_squared(a, b): 
    # note (x, y) in point b will be inverted (below), so need to index properly
    return (a[0] - b[1])**2 + (a[1] - b[0])**2
for idx, point in enumerate(A):
    # gather the neighborhood in x-dimension within x-r <= x <= x+r+1
    # if this has any merit, we could "do it again" for y-coord....
    contenders = B[bisect(B,(point[0]-r-delta, 0)) : bisect(B,(point[0]+r+delta, 0))]
    # further chop down to the y-neighborhood
    # flip the coordinate to support bisection by y-value
    contenders = list(map(lambda p: (p[1], p[0]), contenders))
    contenders.sort()
    contenders = contenders[bisect(contenders,(point[1]-r-delta, 0)) : 
                            bisect(contenders,(point[1]+r+delta, 0))]
    matches = [(dist_squared(point, p2), point, p2) for p2 in contenders 
        if dist_squared(point, p2) <= r_squared]
    if matches:
        pairings[idx] = min(matches)[1]  # pair the closest point in B with the point in A
toc = time.time()
print(toc-tic)
1 голос
/ 16 июня 2020

Что вам, вероятно, нужно, это KDTrees (которые медленны в больших размерах, но должны быть невероятно быстрыми для вашей проблемы. Реализация python даже реализует ограничение радиуса.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...