Numpy упорядоченное вычитание элементов с полной векторизацией - PullRequest
0 голосов
/ 12 февраля 2019

Представьте mxn массив a и 1xn массив b, мы хотим вычесть b из a так, чтобы b вычиталось из первого элемента a,затем максимум из нуля и b-a[0] вычитается из a[1], и так далее ...

Итак:

x = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
a = np.repeat(x, 100000).reshape(10, 100000)
b = np.repeat(np.array([5]), 100000).reshape(1, 100000)

Итак, мы хотим получить: [ 0, 0, 1, 4, 5, 6, 7, 8, 9, 10], повторяется 100 000раз.

Я управлял функцией ниже, которая обеспечивает желаемый результат:

def func(a, b):
    n = np.copy(a)
    m = np.copy(b)
    for i in range(len(n)):
        n[i] = np.where(n[i] >= m, n[i] - m, 0)
        m = np.maximum(0, m - a[i])
        if not m.any():
            return n
    return n

Однако, он не полностью векторизован.Итак:

>> timeit func(a, b)
3.23 ms ± 52.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

В идеале я бы хотел избавиться от цикла for и сделать его как можно более векторизованным.

Спасибо.

1 Ответ

0 голосов
/ 12 февраля 2019

Я думаю, вы можете векторизовать свою функцию следующим образом:

import numpy as np

def func_vec(a, b):
    ar = np.roll(a, 1, axis=0)
    ar[0] = 0
    ac = np.cumsum(ar, axis=0)
    bc = np.maximum(b - ac, 0)
    return np.maximum(a - bc, 0)

Быстрый тест:

import numpy as np

def func(a, b):
    n = np.copy(a)
    m = np.copy(b)
    for i in range(len(n)):
        n[i] = np.where(n[i] >= m, n[i] - m, 0)
        m = np.maximum(0, m - a[i])
        if not m.any():
            return n
    return n

np.random.seed(100)
n = 100000
m = 10
num_max = 100
a = np.random.randint(num_max, size=(m, n))
b = np.random.randint(num_max, size=(1, n))
print(np.all(func(a, b) == func_vec(a, b)))
# True

Однако ваш алгоритм имеет важное преимущество перед векторизованным алгоритмом, который заключается в том, чтоон останавливает итерацию, когда обнаруживает, что вычитать больше нечего.Это означает, что в зависимости от размера проблемы и конкретных значений (которые определяют, когда происходит ранняя остановка, если она вообще есть), векторизованное решение может фактически быть медленнее.См. Приведенный выше пример:

%timeit func(a, b)
# 5.09 ms ± 78.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit func_vec(a, b)
# 12.4 ms ± 939 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Однако вы можете получить что-то от «лучшего из двух миров», используя Numba:

import numpy as np
import numba as nb

@nb.njit
def func_nb(a, b):
    n = np.copy(a)
    m = np.copy(b)
    zero = np.array(0, dtype=a.dtype)
    for i in range(len(n)):
        n[i] = np.maximum(n[i] - m, zero)
        m = np.maximum(zero, m - a[i])
        if not m.any():
            return n
    return n

print(np.all(func(a, b) == func_nb(a, b)))
# True
%timeit func_nb(a, b)
# 3.36 ms ± 461 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
...