Как получить индексы диагональных элементов массива данных разреженной матрицы - PullRequest
0 голосов
/ 16 октября 2018

У меня есть разреженная матрица в формате csr, например:

>>> a = sp.random(3, 3, 0.6, format='csr')  # an example
>>> a.toarray()  # just to see how it looks like
array([[0.31975333, 0.88437035, 0.        ],
       [0.        , 0.        , 0.        ],
       [0.14013856, 0.56245834, 0.62107962]])
>>> a.data  # data array
array([0.31975333, 0.88437035, 0.14013856, 0.56245834, 0.62107962])

Для этого конкретного примера я хочу получить [0, 4], которые являются индексами массива данных ненулевых диагональных элементов 0.31975333 и 0.62107962.

Простой способ сделать это заключается в следующем:

ind = []
seen = set()
for i, val in enumerate(a.data):
    if val in a.diagonal() and val not in seen:
        ind.append(i)
        seen.add(val)

Но на практике матрица очень большая, поэтому я не хочу использоватьдля циклов или преобразовать в массив Numpy, используя метод toarray().Есть ли более эффективный способ сделать это?

Редактировать : Я только что понял, что приведенный выше код дает неверный результат в случаях, когда есть недиагональные элементы, равные и предшествующие некоторым издиагональные элементы: возвращает индексы этого недиагонального элемента.Также он не возвращает индексы повторяющихся диагональных элементов.Например:

a = np.array([[0.31975333, 0.88437035, 0.        ],
              [0.62107962, 0.31975333, 0.        ],
              [0.14013856, 0.56245834, 0.62107962]])
a = sp.csr_matrix(a)

>>> a.data
array([0.31975333, 0.88437035, 0.62107962, 0.31975333, 0.14013856,
       0.56245834, 0.62107962])

Мой код возвращает ind = [0, 2], но должно быть [0, 3, 6].Код, предоставленный Андрасом Диком (его функция get_rowwise), возвращает правильный результат.

Ответы [ 3 ]

0 голосов
/ 17 октября 2018

Метод 1

Это векторизованный подход, который сначала генерирует все ненулевые индексы, а затем получает позиции, в которых индекс строки и столбца одинаков.Это немного медленно и требует много памяти.

import numpy as np
import scipy.sparse as sp
import numba as nb

def get_diag_ind_vec(csr_array):
  inds=csr_array.nonzero()
  return np.array(np.where(inds[0]==inds[1])[0])

Метод 2

В общем, циклические подходы не являются проблемой в отношении производительности, если вы делаетеиспользование компилятора, например.Numba или Cython.Я выделил память для максимально возможного количества диагональных элементов.Если этот метод использует много памяти, его можно легко изменить.

@nb.jit()
def get_diag_ind(csr_array):
    ind=np.empty(csr_array.shape[0],dtype=np.uint64)
    rowPtr=csr_array.indptr
    colInd=csr_array.indices

    ii=0
    for i in range(rowPtr.shape[0]-1):
      for j in range(rowPtr[i],rowPtr[i+1]):
        if (i==colInd[j]):
          ind[ii]=j
          ii+=1

    return ind[:ii]

Время

csr_array = sp.random(1000, 1000, 0.5, format='csr')

get_diag_ind_vec(csr_array)   -> 8.25ms
get_diag_ind(csr_array)       -> 0.65ms (first call excluded)
0 голосов
/ 17 октября 2018

Вот мое решение, которое кажется быстрее, чем get_rowwise (Андрас Дик) и get_diag_ind_vec (макс. 9111) (я не рассматриваю использование Numba или Cython).

Идея состоит в том, чтобы установитьненулевые диагональные элементы матрицы (или ее копии) с некоторым уникальным значением x, которого нет в исходной матрице (я выбрал максимальное значение + 1), а затем просто используйте np.where(a.data == x), чтобы вернуть нужные индексы.

def diag_ind(a):
    a = a.copy()
    i = a.diagonal() != 0  
    x = np.max(a.data) + 1
    a[i, i] = x
    return np.where(a.data == x)

Время:

A = sp.random(1000, 1000, 0.5, format='csr')

>>> %timeit diag_ind(A)
6.32 ms ± 335 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

>>> %timeit get_diag_ind_vec(A)
14.6 ms ± 292 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

>>> %timeit get_rowwise(A)
24.3 ms ± 5.28 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

Редактировать: копирование разреженной матрицы (для сохранения исходной матрицы) неэффективно для памяти, поэтому лучшеРешением было бы сохранить диагональные элементы, а затем использовать их для восстановления исходной матрицы.

def diag_ind2(a):
    a_diag = a.diagonal()
    i = a_diag != 0  
    x = np.max(a.data) + 1
    a[i, i] = x
    ind = np.where(a.data == x)
    a[i, i] = a_diag[np.nonzero(a_diag)]
    return ind

Это еще быстрее:

>>> %timeit diag_ind2(A)
2.83 ms ± 419 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
0 голосов
/ 17 октября 2018

Я нашел, возможно, более эффективное решение, хотя оно все еще работает.Тем не менее, он зацикливается на строках матрицы, а не на самих элементах.В зависимости от шаблона разреженности вашей матрицы это может быть или не быть быстрее.Это гарантированно обойдется в N итераций для разреженной матрицы с N строками.

Мы просто перебираем каждую строку, выбираем индексы заполненных столбцов с помощью a.indices и a.indptr и, если диагональэлемент для данной строки присутствует в заполненных значениях, затем мы вычисляем его индекс:

import numpy as np
import scipy.sparse as sp

def orig_loopy(a):
    ind = []
    seen = set()
    for i, val in enumerate(a.data):
        if val in a.diagonal() and val not in seen:
            ind.append(i)
            seen.add(val)
    return ind

def get_rowwise(a):
    datainds = []
    indices = a.indices # column indices of filled values
    indptr = a.indptr   # auxiliary "pointer" to data indices
    for irow in range(a.shape[0]):
        rowinds = indices[indptr[irow]:indptr[irow+1]] # column indices of the row
        if irow in rowinds:
            # then we've got a diagonal in this row
            # so let's find its index
            datainds.append(indptr[irow] + np.flatnonzero(irow == rowinds)[0])
    return datainds

a = sp.random(300, 300, 0.6, format='csr')
orig_loopy(a) == get_rowwise(a) # True

Для случайного входа в форме (300,300) с той же плотностью исходная версия выполняется за 3,7 секунды, новая версияработает за 5,5 миллисекунд.

...