Распараллеливание разреженных матриц scipy csr для умножения больших матриц с использованием pyspark - PullRequest
0 голосов
/ 13 мая 2019

Я вычисляю косинусное сходство между двумя большими наборами векторов (с одинаковыми характеристиками). Каждый набор векторов представлен в виде разреженной CSR-разреженной матрицы A и B. Я хочу вычислить A x B ^ T , которая не будет разреженной. Однако мне нужно только отслеживать значения, превышающие некоторый порог, например, 0.8. Я пытаюсь реализовать это в Pyspark с помощью ванильных RDD с идеей использования быстрых векторных операций, реализованных для матриц CSR scipy.

Строки A и B нормализованы, поэтому для вычисления косинусного сходства мне просто нужно найти точечное произведение каждой строки из A с каждой строкой из B. Размеры A составляют 5 000 000 x 5000. Размеры B составляют 2 000 000 x 5000.

Предположим, что A и B слишком велики, чтобы поместиться в память на моих рабочих узлах в качестве переменных широковещания. Как я должен подходить к распараллеливанию A и B оптимальным образом?

EDIT После того, как я опубликовал свое решение, я начал изучать другие подходы, которые могут быть более понятными и более оптимальными, в частности, функцию columnShorsities (), реализованную для объектов Spark MLlib IndexedRowMatrix. ( Какая абстракция pyspark подходит для моего большого умножения матриц? )

1 Ответ

0 голосов
/ 14 мая 2019

Мне удалось реализовать решение в этой среде.
Хотелось бы узнать, почему это решение медленное - это заказная сериализация?

def csr_mult_helper(pair):
    threshold=0.8
    A_row = pair[0][0]  # keep track of the row offset
    B_col = pair[1][0]   # offset for B (this will be a column index, after the transpose op)
    A = sparse.csr_matrix(pair[0][1], pair[0][2])  # non-zero entires, size data
    B = sparse.csr_matrix(pair[1][1], pair[1][2])

    C = A * B.T  # scipy sparse mat mul

    for row_idx, row in enumerate(C):  # I think it would be better to use a filter Transformation instead
        col_indices = row.indices      #  but I had trouble with the row and column index book keeping
        col_values = row.data

        for col_idx, val in zip(col_indices, col_values):
            if val > threshold:
                yield (A_row + row_idx, B_col + col_idx, val)  # source vector, target vector, cosine score            

def parallelize_sparse_csr(M, rows_per_chunk=1):
    [rows, cols] = M.shape
    i_row = 0
    submatrices = []
    while i_row < rows:
        current_chunk_size = min(rows_per_chunk, rows - i_row)
        submat = M[i_row:(i_row + current_chunk_size)]
        submatrices.append(   (i_row,                                #  offset
                              (submat.data, submat.indices, submat.indptr),  # sparse matrix data
                              (current_chunk_size, cols)) )      # sparse matrix shape
        i_row += current_chunk_size
    return sc.parallelize(submatrices)

########## generate test data ###########
K,L,M,N = 5,2000,3,2000  # matrix dimensions (toy example)
A_ = sparse.rand(K,L, density=0.1, format='csr')
B_ = sparse.rand(M,N, density=0.1, format='csr')
print("benchmark: {} \n".format((A_ * B_.T).todense()))  # benchmark solution for comparison

########## parallelize, multiply, and filter #########
t_start = time.time()
A = parallelize_sparse_csr(A_, rows_per_chunk=10)
B = parallelize_sparse_csr(B_, rows_per_chunk=10) # number of elements per partition, from B
            # warning: this code breaks if the B_ matrix rows_per_chunk parameter != 1
            # although I don't understand why yet

print("custom pyspark solution: ")
result = A.cartesian(B).flatMap(csr_mult_helper).collect()
print(results)

print("\n {} s elapsed".format(time.time() - t_start))

...