Быстрый способ выполнения последовательных однозначных вычислений для массивов Numpy без цикла for? - PullRequest
0 голосов
/ 10 ноября 2019

Я работаю над проблемой оптимизации, но чтобы не вдаваться в детали, я приведу простой пример ошибки, из-за которой у меня несколько дней болит голова.

Скажи, что яу меня есть двумерный массив с наблюдаемыми координатами xy:

from scipy.optimize import distance
x = np.array([1,2], [2,3], [4,5], [5,6])

У меня также есть список координат xy для сравнения с этими точками (y):

y = np.array([11,13], [12, 14])

У меня есть функция, котораяберет сумму манхэттенских разностей между значением x и всеми значениями в y:

def find_sum(ref_row, comp_rows):
    modeled_counts = []
    y = ref_row * len(comp_rows)
    res = list(map(distance.cityblock, ref_row, comp_rows))
    modeled_counts.append(sum(res))

    return sum(modeled_counts)

По сути, я хотел бы найти сумму расстояний в Манхэттене для каждого элементав y с каждым элементом в x (поэтому в основном для каждого элемента в x найдите сумму манхэттенских расстояний между этой (x, y) парой и каждой (x, y) парой в y).

Я пробовал это со следующей строкой кода:

z = list(map(find_sum, x, y))

Однако z имеет длину 2 (например, y), а не 4, как x,Есть ли способ убедиться, что z является результатом последовательных однозначных вычислений? То есть я хотел бы вычислить сумму всех разностей на Манхэттене между x[0] и каждым набором в y, и так далее, и так далее, поэтому длина z должна быть равна длинеx.

Есть ли простой способ сделать это без for цикла? Мои данные довольно большие (~ 4 миллиона строк), поэтому я очень ценю быстрые решения. Я довольно новичок в программировании на Python, поэтому любые объяснения того, почему решение работает и быстро, также приветствуются, но определенно не обязательно!

Спасибо!

Ответы [ 3 ]

1 голос
/ 10 ноября 2019

Вот как вы можете сделать это, используя простую трансляцию с упрощенным объяснением

Настройка формы для трансляции

import numpy as np

start_points = np.array([[1,2], [2,3], [4,5], [5,6]])
dest_points = np.array([[11,13], [12, 14]])

## using np.newaxis as index add a new dimension at that position
## : give all the elements on that dimension
start_points = start_points[np.newaxis, :, :]
dest_points = dest_points[:, np.newaxis, :]

## Now lets check he shape of the point arrays
print('start_points.shape: ', start_points.shape) # (1, 4, 2)
print('dest_points.shape', dest_points.shape) # (2, 1, 2)

Давайте попробуем разобраться

  • последний элемент фигуры представляет собой x и y точки размером 2
  • , мы можем думать о start_points как о наличии 1 строки и 4 столбцов точек
  • мы можем думать о dest_points как о наличии 2 строк и 1 столбца точек

Мы можем думать start_points и dest_points как матрицу или таблицу точек размера (1X4) и (2X1). четко видно, что размеры не совместимы. Что будет, если мы выполним арифметическую операцию между ними? Вот где появляется умная часть numpy, называемая широковещательной.

  • Она будет повторять строки start_points, чтобы соответствовать матрице dest_point, составляющей (2X4)
  • Itбудет повторять столбцы dest_point, чтобы соответствовать столбцу start_points, составляющему матрицу (2X4)
  • Результатом является арифметическая операция между каждой парой элементов в start_points и dest_points

Рассчитайте расстояние

diff_x_y = start_points - dest_points
print(diff_x_y.shape) # (2, 4, 2)
abs_diff_x_y = np.abs(start_points - dest_points)
man_distance = np.sum(abs_diff_x_y, axis=2)
print('man_distance:\n', man_distance)
sum_distance = np.sum(man_distance, axis=0)
print('sum_distance:\n', sum_distance)

Oneliner

start_points = np.array([[1,2], [2,3], [4,5], [5,6]])
dest_points = np.array([[11,13], [12, 14]])
np.sum(np.abs(start_points[np.newaxis, :, :] - dest_points[:, np.newaxis, :]), axis=(0,2))

Здесь более подробно объяснение вещания , если вы хотите понятьэто больше

1 голос
/ 10 ноября 2019

Это решение реализует расстояние в numpy, так как я думаю, что это хороший пример широковещания , что очень полезно знать, если вам нужно использовать массивы и матрицы.

По определению манхэттенского расстояния необходимо оценить сумму абсолютных значений разности между каждым столбцом. Однако первый столбец x, x[:, 0] имеет форму (4,), а первый столбец y, y[:, 0] имеет форму (2,), поэтому они несовместимы в смысле применениявычитание: свойство широковещания говорит, что каждая фигура сравнивается, начиная с конечных измерений, и два измерения совместимы, когда они равны или одно из них равно 1. К сожалению, ни одно из них не подходит для ваших столбцов.

ОднакоВы можете добавить новое измерение значения 1, используя np.newaxis, поэтому

x[:, 0]

равно array([1, 2, 4, 5]), но

x[:, 0, np.newaxis]

равно

array([[1],
       [2],
       [4],
       [5]])

и его форма (4, 1). Теперь матрица формы (4, 1), вычтенная из массива формы 2, приводит к матрице формы (4, 2) с помощью радиовещательной обработки numpy:

   4 x 1
       2
=  4 x 2

. Вы можетеполучите различия для каждого столбца:

first_column_difference = x[:, 0, np.newaxis] - y[:, 0]
second_column_difference = x[:, 1, np.newaxis] - y[:, 1]

и оцените сумму их абсолютных значений:

np.abs(first_column_difference) + np.abs(second_column_difference)

, что приведет к матрице (4, 2). Теперь вы хотите суммировать значения для каждой строки, чтобы у вас было 4 значения:

np.sum(np.abs(first_column_difference) + np.abs(second_column_difference), axis=1)

, что приводит к array([73, 69, 61, 57]). Правило простое: параметр axis исключит это измерение из результата, поэтому при использовании axis=1 для матрицы (4, 2) генерируется 4 значения - если вы используете axis=0, то генерирует 2 значения.

Итак, это решит вашу проблему:

x = np.array([[1, 2], [2, 3], [4, 5], [5, 6]])
y = np.array([[11, 13], [12, 43]])

first_column_difference = x[:, 0, np.newaxis] - y[:, 0]
second_column_difference = x[:, 1, np.newaxis] - y[:, 1]
z = np.abs(first_column_difference) + np.abs(second_column_difference)
print(np.sum(z, axis=1))

Вы также можете пропустить промежуточные шаги для каждого столбца и оценить все сразу (это немного сложнее понять, поэтому я предпочитаюописанный выше метод для объяснения происходящего):

print(np.abs(x[:, np.newaxis] - y).sum(axis=(1, 2)))

Это общий случай для n-мерного манхэттенского расстояния: если x равно (u, n) и y равно (v, n), он генерирует u строки, передавая (u, 1, n) на (v, n) = (u, v, n), затем применяя sum, чтобы исключить вторую и третью оси.

0 голосов
/ 10 ноября 2019

С таким количеством строк вы можете существенно сэкономить, используя интеллектуальный алгоритм. Давайте для простоты предположим, что есть только одно измерение;как только мы установили алгоритм, возвращение к общему случаю является простым делением суммы по координатам.

Наивный алгоритм - O(mn), где m,n - размеры наборов X,Y. Наш алгоритм O((m+n)log(m+n)), поэтому он масштабируется намного лучше.

Сначала мы должны отсортировать объединение X и Y по координатам, а затем сформировать cumsum по Y. Затем мы находим для каждого x in X число YbefX из y in Y слева от него и используем его для поиска соответствующего cumsum элемента YbefXval. Суммированные расстояния до всех y слева от x представляют собой YbefX кратные координаты x минус YbefXval, расстояния до всех y справа являются суммой всех y координат минус YbefXval минус n - YbefX временная координата x.

Откуда берется экономия? Сортировка координат позволяет нам перерабатывать суммирование, которое мы делали ранее, вместо того, чтобы начинать каждый раз с нуля. При этом используется тот факт, что вплоть до знака мы всегда суммируем одни и те же y координаты и, двигаясь слева направо, знаки переворачиваются один за другим.

Код:

import numpy as np
from scipy.spatial.distance import cdist
from timeit import timeit

def pp(X,Y):
    (m,k),(n,k) = X.shape,Y.shape
    XY = np.concatenate([X.T,Y.T],1)
    idx = XY.argsort(1)
    Xmsk = idx<m
    Ymsk = ~Xmsk
    Xidx = np.arange(k)[:,None],idx[Xmsk].reshape(k,m)
    Yidx = np.arange(k)[:,None],idx[Ymsk].reshape(k,n)
    YbefX = Ymsk.cumsum(1)[Xmsk].reshape(k,m)
    YbefXval = XY[Yidx].cumsum(1)[np.arange(k)[:,None],YbefX-1]
    YbefXval[YbefX==0] = 0
    XY[Xidx] = ((2*YbefX-n)*XY[Xidx]) - 2*YbefXval + Y.sum(0)[:,None]
    return XY[:,:m].sum(0)

def summed_cdist(X,Y):
    return cdist(X,Y,"minkowski",p=1).sum(1)

# demo    
m,n,k = 1000,500,10
X,Y = np.random.randn(m,k),np.random.randn(n,k)
print("same result:",np.allclose(pp(X,Y),summed_cdist(X,Y)))
print("sort       :",timeit(lambda:pp(X,Y),number=1000),"ms")
print("scipy cdist:",timeit(lambda:summed_cdist(X,Y),number=100)*10,"ms")

Пример выполнения, сравнивая умный алгоритм «сортировки» с наивным алгоритмом, реализованным с использованием библиотечной функции cdist:

same result: True
sort       : 1.4447695480193943 ms
scipy cdist: 36.41934019047767 ms
...