Эффективно суммируйте сложные матричные произведения с помощью Numpy - PullRequest
0 голосов
/ 31 декабря 2018

У меня есть матрица, X, для которой я вычисляю взвешенную сумму промежуточных матричных произведений.Вот минимальный воспроизводимый пример:

import numpy as np

random_state = np.random.RandomState(1)
n = 5
p = 10

X = random_state.rand(p, n) # 10x5
X_sum = np.zeros((n, n)) # 5x5

# The length of weights are not related to X's dims,
# but will always be smaller
y = 3
weights = random_state.rand(y)

for k in range(y):
    X_sum += np.dot(X.T[:, k + 1:],
                    X[:p - (k + 1), :]) * weights[k]

Это прекрасно работает и дает ожидаемые результаты.Однако с ростом размеров n и y (в сотни) это становится чрезвычайно дорогостоящим, поскольку многократное вычисление матричных продуктов не совсем эффективно ...

Существует очевидная закономерность того, какоднако вычисляются продукты:

First iteration

Second iteration

Вы можете видеть, как ход итерацийначальный срез столбца в Xt перемещается вправо, а конечный ряд в X перемещается вверх.Вот как должна выглядеть N-я итерация:

Nth iteration

Это фактически означает, что подмножество одинаковых значений многократно умножается (см. edit 2), которая, как мне кажется, может иметь возможность использовать ... (т. Е. Если бы я вручную вычислял продукт за один проход).

Но я надеюсь, что не придетсясделайте что-нибудь вручную, и что есть хороший способ сделать этот цикл более элегантным с помощью Numpy.

Edit 1

Реалистичный набор чисел:

n = 400
p = 2000
y = 750

Изменить 2

Чтобы ответить на комментарий:

Не могли бы вы объяснить, какие значения многократно умножаются?

Рассмотрим следующий массив:

n = p = 5
X = np.arange(25).reshape(p, n)

Для k=0 первый продукт будет между A и B:

k = 0
A = X.T[:, k + 1:]
B = X[:p - (k + 1), :]
>>> A
array([[ 5, 10, 15, 20],
       [ 6, 11, 16, 21],
       [ 7, 12, 17, 22],
       [ 8, 13, 18, 23],
       [ 9, 14, 19, 24]])
>>> B
array([[ 0,  1,  2,  3,  4],
       [ 5,  6,  7,  8,  9],
       [10, 11, 12, 13, 14],
       [15, 16, 17, 18, 19]])

И когда k=1:

k = 1
>>> A
array([[10, 15, 20],
       [11, 16, 21],
       [12, 17, 22],
       [13, 18, 23],
       [14, 19, 24]])
>>> B
array([[ 0,  1,  2,  3,  4],
       [ 5,  6,  7,  8,  9],
       [10, 11, 12, 13, 14]])

Так каждый последующийМатричный продукт - это подмножество предыдущего продукта, если это имеет смысл.

Ответы [ 2 ]

0 голосов
/ 31 декабря 2018

TLDR;Я бы выбрал @ Parfait для использования test_gen_sum на основе сравнительного анализа различных значений n, p и y.Сохранение старого ответа здесь для преемственности .

Оцените, как n, p, y влияют на выбор алгоритма

Этот анализ выполняется с использованием функций @ Parfait каксредство определения, действительно ли существует одно лучшее решение или существует семейство решений на основе значений n, p и y.

import numpy as np
import pytest # This code also requires the pytest-benchmark plugin

def test_for_sum(n, p, y):
    random_state = np.random.RandomState(1)
    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    weights = random_state.rand(y)

    for k in range(y):
        X_sum += np.dot(X.T[:, k + 1:],
                    X[:p - (k + 1), :]) * weights[k]

    return X_sum


def test_list_sum(n, p, y):
    random_state = np.random.RandomState(1)

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    weights = random_state.rand(y)

    matrix_list = [np.dot(X.T[:, k + 1:],
                      X[:p - (k + 1), :]) * weights[k] for k in range(y)]

    X_sum = np.sum(matrix_list, axis=0)

    return X_sum


def test_reduce_sum(n, p, y):
    random_state = np.random.RandomState(1)

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    weights = random_state.rand(y)

    matrix_list = [(X.T[:, k + 1:] @
                X[:p - (k + 1), :]) * weights[k] for k in range(y)]

    X_sum = reduce(lambda x,y: x + y, matrix_list)

    return X_sum


def test_concat_sum(n, p, y):
    random_state = np.random.RandomState(1)

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    weights = random_state.rand(y)

    x_mat = np.concatenate([np.matmul(X.T[:, k + 1:],
                                  X[:p - (k + 1), :]) for k in range(y)])

    wgt_mat = np.concatenate([np.full((n,1), weights[k]) for k in range(y)])

    mul_res = x_mat * wgt_mat        
    X_sum = mul_res.reshape(-1, n, n).sum(axis=0)

    return X_sum


def test_matmul_sum(n, p, y):
    random_state = np.random.RandomState(1)
    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    weights = random_state.rand(y)
    # Use list comprehension and np.matmul 
    matrices_list = [np.matmul(X.T[:, k + 1:],
                           X[:p - (k + 1), :]) * weights[k] for k in range(y)]

    # Sum matrices in list of matrices to get the final result   
    X_sum = np.sum(matrices_list, axis=0)

    return X_sum


def test_gen_sum(n, p, y):
    random_state = np.random.RandomState(1)

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    weights = random_state.rand(y)

    matrix_gen = (np.dot(X.T[:, k + 1:],
                     X[:p - (k + 1), :]) * weights[k] for k in range(y))

    X_sum = sum(matrix_gen)

    return X_sum

parameters = [
    pytest.param(400, 800, 3)
    ,pytest.param(400, 2000, 3)
    ,pytest.param(400, 800, 750)
    ,pytest.param(400, 2000, 750)
]

@pytest.mark.parametrize('n,p,y', parameters)
def test_test_for_sum(benchmark, n, p, y):
    benchmark(test_for_sum, n=n, p=p, y=y)

@pytest.mark.parametrize('n,p,y', parameters)
def test_test_list_sum(benchmark, n, p, y):
     benchmark(test_list_sum, n=n, p=p, y=y)

@pytest.mark.parametrize('n,p,y', parameters)
def test_test_reduce_sum(benchmark, n, p, y):
    benchmark(test_reduce_sum, n=n, p=p, y=y)

@pytest.mark.parametrize('n,p,y', parameters)
def test_test_concat_sum(benchmark, n, p, y):
    benchmark(test_concat_sum, n=n, p=p, y=y)

@pytest.mark.parametrize('n,p,y', parameters)
def test_test_matmul_sum(benchmark, n, p, y):
    benchmark(test_matmul_sum, n=n, p=p, y=y)

@pytest.mark.parametrize('n,p,y', parameters)
def test_test_gen_sum(benchmark, n, p, y):
    benchmark(test_gen_sum, n=n, p=p, y=y)
  • n=400, p=800, y=3 (100 итераций)

    • победитель: test_gen_sumenter image description here
  • n=400, p=2000, y=3 (100 итераций)

    • победитель: test_gen_sum enter image description here
  • n=400, p=800, y=750 (10 итераций)

    • победитель: test_gen_sum enter image description here
  • n=400, p=2000,y=750 (10 итераций)

    • победитель: test_gen_sum enter image description here

СТАРЫЙ ОТВЕТ

Меньше y значения

Я бы определенно использовал np.matmul вместо np.dot, это даст вам самый большой прирост производительности и фактическидокументация для np.dot направит вас к np.matmul для умножения двумерного массива вместо np.dot.

Я тестировал np.dot и np.matmul с и без понимания списка, а также pytest-benchmark результаты приведены здесь:

y=3

Кстати pytest-benchmark довольно ловкий, и я настоятельно рекомендую в такие моменты проверять, действительно ли подход действительно эффективен.

Простое использование списочного понимания имеетпочти незначительное влияние на np.matmul результаты и отрицательное влияние на np.dot (хотя это и лучшая форма) в схеме вещей, но комбинация обоих изменений дала наилучшие результаты в терминах.Я хотел бы предупредить, что использование списочных представлений обычно приводит к увеличению скорости.девиациявремени выполнения, поэтому вы можете увидеть большие диапазоны производительности, чем если бы вы просто использовали np.matmul.

Вот код:

import numpy as np

def test_np_matmul_list_comprehension():
    random_state = np.random.RandomState(1)
    n = p = 1000
    X = np.arange(n * n).reshape(p, n)

    # The length of weights are not related to X's dims,
    # but will always be smaller
    y = 3
    weights = [1, 1, 1]
    # Use list comprehension and np.matmul 
    matrices_list = [np.matmul(X.T[:, k + 1:],
                             X[:p - (k + 1), :]) * weights[k] for k in range(y)]

    # Sum matrices in list of matrices to get the final result   
    X_sum = np.sum(matrices_list, axis=0)

Большие y значения

При больших значениях y лучше не использовать списки.Среднее / среднее время выполнения имеет тенденцию быть больше как для np.dot, так и np.matmul в обоих этих случаях.Вот результаты pytest-benchmark для (n=500, p=5000, y=750):

enter image description here

Это, вероятно, излишне, но япредпочел бы ошибиться на стороне того, чтобы быть слишком полезным:).

0 голосов
/ 31 декабря 2018

Рассмотрим следующие перефакторизованные версии по сравнению с итеративными суммами вызовов в цикле for.В новых версиях, использующих reduce, генератор и np.concatenate, результат немного быстрее, но все равно сравним с циклом for.Каждый работает с n = 400, p = 800, y = 750.

OP Оригинальная версия

import numpy as np

def test_for_sum():
    random_state = np.random.RandomState(1)
    n= 400
    p = 800

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    y = 750
    weights = random_state.rand(y)

    for k in range(y):
        X_sum += np.dot(X.T[:, k + 1:],
                        X[:p - (k + 1), :]) * weights[k]

    return X_sum

Список понимания с np.dot

def test_list_sum():
    random_state = np.random.RandomState(1)
    n= 400
    p = 800

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    y = 750
    weights = random_state.rand(y)

    matrix_list = [np.dot(X.T[:, k + 1:],
                          X[:p - (k + 1), :]) * weights[k] for k in range(y)]

    X_sum = sum(matrix_list)

    return X_sum

Версия генератора

def test_gen_sum():
    random_state = np.random.RandomState(1)
    n= 400
    p = 800

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    y = 750
    weights = random_state.rand(y)

    matrix_gen = (np.dot(X.T[:, k + 1:],
                         X[:p - (k + 1), :]) * weights[k] for k in range(y))

    X_sum = sum(matrix_gen)

    return X_sum

Уменьшить версию (используя новый оператор @ - синтаксический сахар - вместо np.matmul)

from functools import reduce

def test_reduce_sum():
    random_state = np.random.RandomState(1)
    n= 400
    p = 800

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    y = 750
    weights = random_state.rand(y)

    matrix_list = [(X.T[:, k + 1:] @
                    X[:p - (k + 1), :]) * weights[k] for k in range(y)]

    X_sum = reduce(lambda x,y: x + y, matrix_list)

    return X_sum

Конкатенированная версия

def test_concat_sum():
    random_state = np.random.RandomState(1)
    n= 400
    p = 800

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    y = 750
    weights = random_state.rand(y)

    x_mat = np.concatenate([np.matmul(X.T[:, k + 1:],
                                      X[:p - (k + 1), :]) for k in range(y)])

    wgt_mat = np.concatenate([np.full((n,1), weights[k]) for k in range(y)])

    mul_res = x_mat * wgt_mat        
    X_sum = mul_res.reshape(-1, n, n).sum(axis=0)

    return X_sum

Понимание списка с помощью np.matmul

def test_matmul_sum():
    random_state = np.random.RandomState(1)
    n = 400
    p = 800
    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    y = 750
    weights = random_state.rand(y)
    # Use list comprehension and np.matmul 
    matrices_list = [np.matmul(X.T[:, k + 1:],
                               X[:p - (k + 1), :]) * weights[k] for k in range(y)]

    # Sum matrices in list of matrices to get the final result   
    X_sum = np.sum(matrices_list, axis=0)

    return X_sum

Время

import time

start_time = time.time()
res_for = test_for_sum()
print("SUM: {} seconds ---".format(time.time() - start_time))

start_time = time.time()
res_list = test_list_sum()
print("LIST: {} seconds ---".format(time.time() - start_time))

start_time = time.time()
res_gen = test_gen_sum()
print("GEN: {} seconds ---".format(time.time() - start_time))

start_time = time.time()
res_reduce= test_reduce_sum()
print("REDUCE: {} seconds ---".format(time.time() - start_time))

start_time = time.time()
res_concat = test_concat_sum()
print("CONCAT: {} seconds ---".format(time.time() - start_time))

start_time = time.time()
res_matmul = test_matmul_sum()
print("MATMUL: {} seconds ---".format(time.time() - start_time))

Проверка на равенство

print(np.array_equal(res_for, res_list))
# True
print(np.array_equal(res_for, res_gen))
# True
print(np.array_equal(res_for, res_reduce))
# True
print(np.array_equal(res_for, res_concat))
# True
print(np.array_equal(res_for, res_matmul))
# True

Первый запуск

# SUM: 21.569773197174072 seconds ---
# LIST: 23.576102018356323 seconds ---
# GEN: 21.385253429412842 seconds ---
# REDUCE: 21.426464080810547 seconds ---
# CONCAT: 21.059731483459473 seconds ---
# MATMUL: 23.57494807243347 seconds ---

Второй запуск

# SUM: 21.6339168548584 seconds ---
# LIST: 19.767740488052368 seconds ---
# GEN: 23.86947798728943 seconds ---
# REDUCE: 19.880712032318115 seconds ---
# CONCAT: 20.761067152023315 seconds ---
# MATMUL: 23.55513620376587 seconds ---

Третий прогон

# SUM: 22.764745473861694 seconds ---
# LIST: 19.953850984573364 seconds ---
# GEN: 24.37714171409607 seconds ---
# REDUCE: 22.54508638381958 seconds ---
# CONCAT: 21.20585823059082 seconds ---
# MATMUL: 22.303589820861816 seconds ---
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...