Эффективный способ векторизации расчета расстояния - PullRequest
2 голосов
/ 25 апреля 2019

Для моего исследования я должен реализовать парное вычисление расстояния L1-расстояния между двумя наборами векторов, каждый из которых представлен в виде матрицы NumPy (векторы - строки).Это должно быть сделано с использованием двух циклов, одного цикла и без циклов.Я ожидал, что, поскольку NumPy так хорош в векторизации, алгоритмы должны ранжироваться как два цикла медленнее, чем один цикл, медленнее, чем без циклов.

Я написал функции:

def f_cdist_2(X1, X2):
    res = np.zeros(shape=(X1.shape[0], X2.shape[0]), dtype=np.float64)

    for ix1 in range(X1.shape[0]):
        for ix2 in range(X2.shape[0]):
            res[ix1, ix2] = np.abs(X1[ix1, :] - X2[ix2, :]).sum()

    return res


def f_cdist_1(X1, X2):
    res = np.zeros(shape=(X1.shape[0], X2.shape[0]), dtype=np.float64)

    for ix1 in range(X1.shape[0]):
        res[ix1, :] = np.abs(np.tile(X1[ix1, :], (X2.shape[0], 1)) - X2).sum(axis=1)

    return res


def f_cdist_0(X1, X2):
    res = np.abs(
            np.tile(X1[:, :, np.newaxis], (1, 1, X2.shape[0])) - \
            np.tile(X2.T[np.newaxis, :, :], (X1.shape[0], 1, 1))
    ).sum(axis=1)

    return res

Затем я проверил производительность с двумя случайными матрицами форм 128 x 512 и 256 x 512, основываясь на 100 прогонах. Я получил результаты:

  1. Два цикла: 156 мс

  2. Один цикл: 32 мсек

  3. Нет циклов: 135 мсек

Я также пробовал cdistс scipy.spatial.distance и получил лучшую производительность из всех: 9 мсек.

Теперь, есть ли лучший способ реализовать функцию без циклов?Я надеялся, что он будет работать по крайней мере так же хорошо, как один цикл, но сейчас я в растерянности относительно того, как его улучшить.

ОБНОВЛЕНИЕ

Использование kwinkunks Реализация подхода без циклов, я перезапустил тесты (снова 100 испытаний) на матрицах 1024 x 1024, результаты ниже:

  1. Двапетли: 5,7 с

  2. Одна петля: 6,6 с

  3. Без циклов: 3,9 с

  4. scipy.spatial.distance.cdist: 0,6 с

Так что на больших матрицах реализация без циклов действительно работает лучше.scipy творит чудеса, но, если я правильно понимаю, он написан на C, поэтому такая великолепная производительность.

ОБНОВЛЕНИЕ

Пробовал с 4096 x 1024 матрицnp.float64, та же настройка:

  1. Два цикла: 88 секунд

  2. Один цикл: 66 секунд

  3. Без циклов: не хватило памяти (на данный момент было ~ 18 ГБ свободной оперативной памяти)

  4. scipy.spatial.distance.cdist: 13 с

Ответы [ 3 ]

4 голосов
/ 27 апреля 2019

Вы можете получить дополнительное ускорение от векторизованной версии, используя Pythran

f_dist.py:

import numpy as np
#pythran export f_dist(float64[:,:], float64[:,:])
def f_dist(X1, X2):
    return np.sum(np.abs(X1[:, None, :] - X2), axis=-1)

На моем ноутбуке оригинальная версия работает по адресу:

> python -m timeit -s 'from f_dist import f_dist; from numpy.random import random; x = random((100,100)); y = random((100,100))' 'f_dist(x, y)'
100 loops, best of 3: 7.05 msec per loop

После компиляции ядра:

> pythran f_dist.py

Вы можете сравнить его:

> python -m timeit -s 'from f_dist import f_dist; from numpy.random import random; x = random((100,100)); y = random((100,100))' 'f_dist(x, y)'
1000 loops, best of 3: 1.21 msec per loop

Использование инструкций SIMD дополнительно ускоряет вычисления:

> pythran f_dist.py -DUSE_XSIMD -march=native
> python -m timeit -s 'from f_dist import f_dist; from numpy.random import random; x = random((100,100)); y = random((100,100))' 'f_dist(x, y)'
1000 loops, best of 3: 774 usec per loop

Отказ от ответственности: я главный разработчик проекта pythran.

0 голосов
/ 29 апреля 2019

Решение с использованием Numba

  • Распараллелено (на очень маленьких выборках, например (24x24) распараллеленная версия будет медленнее из-за накладных расходов на создание потоков)
  • Внутренний цикл SIMD-векторизован

Exmaple

import numpy as np
import numba as nb

#Debug output for SIMD-vectorization
import llvmlite.binding as llvm
llvm.set_option('', '--debug-only=loop-vectorize')
########################################

#Your solution
#You can also use Numba on this, but apart from parallization
#it is often better to write out the inner loop
def f_cdist(X1, X2):
    res = np.zeros(shape=(X1.shape[0], X2.shape[0]), dtype=np.float64)

    for ix1 in range(X1.shape[0]):
        for ix2 in range(X2.shape[0]):
            res[ix1, ix2] = np.abs(X1[ix1, :] - X2[ix2, :]).sum()

    return res

@nb.njit(fastmath=True,parallel=True)
def f_cdist_nb(X1, X2):
    #Some safety, becuase there is no bounds-checking
    assert X1.shape[1]==X2.shape[1]
    res = np.empty(shape=(X1.shape[0], X2.shape[0]), dtype=X1.dtype)

    for ix1 in nb.prange(X1.shape[0]):
        for ix2 in range(X2.shape[0]):
            #Writing out the inner loop often leads to better performance
            sum=0.
            for i in range(X1.shape[1]):
                sum+=np.abs(X1[ix1, i] - X2[ix2, i])
            res[ix1, ix2] = sum

    return res

Perfomance

from scipy import spatial
#4096x1024    
X1=np.random.rand(4096,1024)
X2=np.random.rand(4096,1024)

res1=f_cdist_nb(X1,X2)
res2=f_cdist(X1,X2)
res3=spatial.distance.cdist(X1, X2, 'cityblock')

#Check the results
np.allclose(res1,res2)
True
np.allclose(res1,res3)
True

%timeit res1=f_cdist_nb(X1,X2)
1.38 s ± 64.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res2=f_cdist(X1,X2)
1min 25s ± 483 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res3=spatial.distance.cdist(X1, X2, 'cityblock')
17.6 s ± 18.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

#1024x1024
X1=np.random.rand(1024,1024)
X2=np.random.rand(1024,1024)

%timeit res1=f_cdist_nb(X1,X2)
63.5 ms ± 3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit res3=spatial.distance.cdist(X1, X2, 'cityblock')
1.09 s ± 3.04 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

#512x512
X1=np.random.rand(512,512)
X2=np.random.rand(512,512)

%timeit res1=f_cdist_nb(X1,X2)
4.91 ms ± 280 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit res3=spatial.distance.cdist(X1, X2, 'cityblock')
130 ms ± 150 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

Редактировать: оптимизированная для рук версия Numba

#Unroll and Jam loops
@nb.njit(fastmath=True,parallel=True)
def f_cdist_nb_3(X1, X2):
    assert X1.shape[1]==X2.shape[1]
    res = np.empty(shape=(X1.shape[0], X2.shape[0]), dtype=X1.dtype)

    for ix1 in nb.prange(X1.shape[0]//4):
        for ix2 in range(X2.shape[0]//4):
            sum_1,sum_2,sum_3,sum_4,sum_5,sum_6   =0.,0.,0.,0.,0.,0.
            sum_7,sum_8,sum_9,sum_10,sum_11,sum_12=0.,0.,0.,0.,0.,0.
            sum_13,sum_14,sum_15,sum_16=0.,0.,0.,0.

            for i in range(X1.shape[1]):
                sum_1+=np.abs(X1[ix1*4+0, i] - X2[ix2*4+0, i])
                sum_2+=np.abs(X1[ix1*4+0, i] - X2[ix2*4+1, i])
                sum_3+=np.abs(X1[ix1*4+0, i] - X2[ix2*4+2, i])
                sum_4+=np.abs(X1[ix1*4+0, i] - X2[ix2*4+3, i])
                sum_5+=np.abs(X1[ix1*4+1, i] - X2[ix2*4+0, i])
                sum_6+=np.abs(X1[ix1*4+1, i] - X2[ix2*4+1, i])
                sum_7+=np.abs(X1[ix1*4+1, i] - X2[ix2*4+2, i])
                sum_8+=np.abs(X1[ix1*4+1, i] - X2[ix2*4+3, i])
                sum_9+=np.abs(X1[ix1*4+2, i] - X2[ix2*4+0, i])
                sum_10+=np.abs(X1[ix1*4+2, i] - X2[ix2*4+1, i])
                sum_11+=np.abs(X1[ix1*4+2, i] - X2[ix2*4+2, i])
                sum_12+=np.abs(X1[ix1*4+2, i] - X2[ix2*4+3, i])
                sum_13+=np.abs(X1[ix1*4+3, i] - X2[ix2*4+0, i])
                sum_14+=np.abs(X1[ix1*4+3, i] - X2[ix2*4+1, i])
                sum_15+=np.abs(X1[ix1*4+3, i] - X2[ix2*4+2, i])
                sum_16+=np.abs(X1[ix1*4+3, i] - X2[ix2*4+3, i])

            res[ix1*4+0, ix2*4+0] = sum_1
            res[ix1*4+0, ix2*4+1] = sum_2
            res[ix1*4+0, ix2*4+2] = sum_3
            res[ix1*4+0, ix2*4+3] = sum_4
            res[ix1*4+1, ix2*4+0] = sum_5
            res[ix1*4+1, ix2*4+1] = sum_6
            res[ix1*4+1, ix2*4+2] = sum_7
            res[ix1*4+1, ix2*4+3] = sum_8
            res[ix1*4+2, ix2*4+0] = sum_9
            res[ix1*4+2, ix2*4+1] = sum_10
            res[ix1*4+2, ix2*4+2] = sum_11
            res[ix1*4+2, ix2*4+3] = sum_12
            res[ix1*4+3, ix2*4+0] = sum_13
            res[ix1*4+3, ix2*4+1] = sum_14
            res[ix1*4+3, ix2*4+2] = sum_15
            res[ix1*4+3, ix2*4+3] = sum_16

    #Rest of the loop
    for ix1 in range(X1.shape[0]//4*4,X1.shape[0]):
        for ix2 in range(X2.shape[0]):
            sum_1=0.
            for i in range(X1.shape[1]):
                sum_1+=np.abs(X1[ix1, i] - X2[ix2, i])
            res[ix1, ix2] = sum_1

    for ix1 in range(X1.shape[0]):
        for ix2 in range(X2.shape[0]//4*4,X2.shape[0]):
            sum_1=0.
            for i in range(X1.shape[1]):
                sum_1+=np.abs(X1[ix1, i] - X2[ix2, i])
            res[ix1, ix2] = sum_1
    return res

Задержка

#4096x1024    
X1=np.random.rand(4096,1024)
X2=np.random.rand(4096,1024)

res1=f_cdist_nb(X1,X2)
res2=f_cdist_nb_3(X1,X2)
res3=spatial.distance.cdist(X1, X2, 'cityblock')

#Check the results
print(np.allclose(res1,res2))
print(np.allclose(res1,res3))

%timeit res1=f_cdist_nb(X1,X2)
1.6 s ± 199 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res2=f_cdist_nb_3(X1,X2)
497 ms ± 50.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res3=spatial.distance.cdist(X1, X2, 'cityblock')
17.7 s ± 118 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
0 голосов
/ 25 апреля 2019

Вы можете избежать тайлинга и т. Д. С помощью трансляции NumPy:

def f_dist(X1, X2):
    return np.sum(np.abs(X1[:, None, :] - X2), axis=-1)

Но, на удивление (во всяком случае, для меня), он не быстрее вашего цикла (около 90 мс на моей машине, по сравнению с 24 мс для вашей функции f_cdist_1()).

Этот прием с вещанием часто бывает полезен. Это означает, что вы можете делать такие вещи:

>>> np.array([1,2,3]) * np.array([10, 20, 30])[:, None]
array([[10, 20, 30],
       [20, 40, 60],
       [30, 60, 90]])
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...