Эффективное вычисление всех попарных произведений элементов данного вектора в NumPy - PullRequest
2 голосов
/ 26 мая 2020

Я ищу «оптимальный» способ вычисления всех попарных произведений элементов данного вектора. Если вектор имеет размер N, на выходе будет вектор размером N * (N + 1) // 2 и будет содержать x[i] * x[j] значения для всех пар (i, j) с i <= j. Наивный способ вычислить это следующим образом:

import numpy as np

def get_pairwise_products_naive(vec: np.ndarray):
    k, size = 0, vec.size
    output = np.empty(size * (size + 1) // 2)
    for i in range(size):
        for j in range(i, size):
            output[k] = vec[i] * vec[j]
            k += 1
    return output

Desiderata:

  • Минимизация выделения / использования дополнительной памяти: если возможно, напрямую записывать в выходной буфер.
  • Используйте векторизованные NumPy подпрограммы вместо явных циклов.
  • Избегайте лишних (ненужных) вычислений.

Я играл с такими подпрограммами, как outer, triu_indices и einsum, а также некоторые уловки с индексированием / просмотром, но не удалось найти решение, которое соответствует указанным выше требованиям.

Ответы [ 3 ]

3 голосов
/ 26 мая 2020

Подход № 1

Для векторизованного с NumPy вы можете использовать маскирующий после получения всех попарных умножений с внешним умножением, например -

def pairwise_multiply_masking(a):
    return (a[:,None]*a)[~np.tri(len(a),k=-1,dtype=bool)]

Подход № 2

Для действительно больших входных 1D-массивов мы можем захотеть прибегнуть к итеративному методу slicing, который использует one-l oop -

def pairwise_multiply_iterative_slicing(a):
    n = len(a)
    N = (n*(n+1))//2
    out = np.empty(N, dtype=a.dtype)
    c = np.r_[0,np.arange(n,0,-1)].cumsum()
    for ii,(i,j) in enumerate(zip(c[:-1],c[1:])):
        out[i:j] = a[ii:]*a[ii]
    return out

Сравнительный анализ

Мы включим pairwise_products и pairwise_products_numba из решения @ orlp в настройку.

Использование пакета benchit (несколько инструментов тестирования собраны вместе; отказ от ответственности: я являюсь его автором) для тестирования предлагаемых решений.

import benchit
funcs = [pairwise_multiply_masking, pairwise_multiply_iterative_slicing, pairwise_products_numba, pairwise_products]
in_ = [np.random.rand(n) for n in [10,50,100,200,500,1000,5000]]
t = benchit.timings(funcs, in_)
t.plot(logx=True, save='timings.png')
t.speedups(-1).plot(logx=True, logy=False, save='speedups.png')

Результаты (тайминги и ускорение более pairwise_products) -

enter image description here

enter image description here

Как видно из графиков тенденций, для действительно больших массивов вариант, основанный на нарезке, начнет выигрывать, в противном случае вариант с векторизацией будет работать хорошо.

Предложения

  • Мы также можем изучить * 10 52 * для более эффективного выполнения внешних умножений для больших массивов.
3 голосов
/ 26 мая 2020

Я бы, вероятно, вычислил M = v T v , а затем сгладил бы нижнюю или верхнюю tri angular часть этой матрицы.

def pairwise_products(v: np.ndarray):
    assert len(v.shape) == 1
    n = v.shape[0]
    m = v.reshape(n, 1) @ v.reshape(1, n)
    return m[np.tril_indices_from(m)].ravel()

I также хотел бы упомянуть numba, что сделает ваш «наивный» подход, скорее всего, быстрее, чем этот.

import numba

@numba.njit
def pairwise_products_numba(vec: np.ndarray):
    k, size = 0, vec.size
    output = np.empty(size * (size + 1) // 2)
    for i in range(size):
        for j in range(i, size):
            output[k] = vec[i] * vec[j]
            k += 1
    return output

Простое тестирование приведенного выше pairwise_products(np.arange(5000)) занимает ~ 0,3 секунды c, тогда как версия numba занимает ~ 0,05 сек. c (без учета первого запуска, который используется для своевременной компиляции функции).

0 голосов
/ 27 мая 2020

Вы также можете распараллелить этот алгоритм. Если бы можно было выделить достаточно большой массив (меньший вид этого массива почти ничего не стоит) только один раз и перезаписать его впоследствии, можно будет добиться большего ускорения.

Пример

@numba.njit(parallel=True)
def pairwise_products_numba_2_with_allocation(vec):
    k, size = 0, vec.size
    k_vec=np.empty(vec.size,dtype=np.int64)
    output = np.empty(size * (size + 1) // 2)

    #precalculate the indices
    for i in range(size):
        k_vec[i] = k
        k+=(size-i)

    for i in numba.prange(size):
        k=k_vec[i]
        for j in range(size-i):
            output[k+j] = vec[i] * vec[j+i]

    return output

@numba.njit(parallel=True)
def pairwise_products_numba_2_without_allocation(vec,output):
    k, size = 0, vec.size
    k_vec=np.empty(vec.size,dtype=np.int64)

    #precalculate the indices
    for i in range(size):
        k_vec[i] = k
        k+=(size-i)

    for i in numba.prange(size):
        k=k_vec[i]
        for j in range(size-i):
            output[k+j] = vec[i] * vec[j+i]

    return output

Время

A=np.arange(5000)
k, size = 0, A.size
output = np.empty(size * (size + 1) // 2)

%timeit res_1=pairwise_products_numba_2_without_allocation(A,output)
#7.84 ms ± 116 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res_2=pairwise_products_numba_2_with_allocation(A)
#16.9 ms ± 325 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res_3=pairwise_products_numba(A) #@orlp
#43.3 ms ± 134 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
...