Эффективное объединение нескольких массивов в одни и те же ячейки - PullRequest
0 голосов
/ 19 июня 2020

Приведенный ниже код занимает 12 секунд для num_iterations = 1.

Как я могу сделать это быстрее (включая использование многопроцессорности)? Я хочу иметь возможность сделать это для num_iterations = 1000.

Я не могу найти разумный способ использовать многопроцессорность из-за списка структуры списка, который требуется, поскольку количество элементов в каждой ячейке переменная. Многопроцессорность / memmap явно не работают с этой структурой.

import time
import numpy as np

idx = np.random.randint(int(1e6),size=int(1e7))
output = [[] for i in range(int(1e6))]
num_iterations = 1

t1=time.time()

for i in range(num_iterations):
    values = np.random.random(int(1e7))
    time.sleep(5)    # To account for the fact I need to load the data from disc.
    for j,value in enumerate(values):
        output[idx[j]].append(value)

print(time.time()-t1) # 12 seconds for num_iterations=1

1 Ответ

1 голос
/ 21 июня 2020

Следует отметить, что каждый раз, когда вы сопоставляете values с output, для каждой ячейки добавляется одинаковое количество записей. Таким образом, вместо добавления в списки вы можете предварительно выделить numpy массивы с правильным размером.

Более того, вы можете хранить индексы для каждой ячейки в массивах (по одному массиву на ячейку). Поскольку вам нужно создать эти массивы индексов только один раз для всех итераций, и поскольку их гораздо меньше, чем значений, вы можете извлечь все новые записи из values для одного конкретного контейнера, используя:

output_chunk = values[indices]

Еще быстрее использовать numba (он включен в дистрибутив Anaconda Python) для своевременной (jit) компиляции. Тогда вы получите производительность, аналогичную C, без проблем с компиляцией и компоновкой кода. Я не уверен во всех деталях оптимизации numba, но я думаю, что наиболее эффективно, если функция с пометкой @njit использует только скалярные переменные и массивы numpy, а не Python списки и словари.

Вот время для трех реализаций (для меньшего размера массива, чем в вашем вопросе):

%timeit -r2 -n5 f_reference(100)
486 ms ± 4.42 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)

%timeit -r2 -n50 f_vectorized(100)
73.6 ms ± 219 µs per loop (mean ± std. dev. of 2 runs, 50 loops each)

%timeit -r2 -n50 f_numba(100)
9.6 ms ± 40.9 µs per loop (mean ± std. dev. of 2 runs, 50 loops each)

Вот реализация:

import numpy as np
import numba

np.random.seed(1)

n_vals = 10000 # 1e7
n_bins = 1000# 1e6

idx = np.random.randint(n_bins, size=n_vals)

def f_reference(n_iter):
    np.random.seed(2)
    output = [[] for i in range(n_bins)]
    for i in range(n_iter):
        values = np.random.random(n_vals) # simulate loading data
        for j,value in enumerate(values):
            output[idx[j]].append(value)    
    return [np.array(x) for x in output]

def f_vectorized(n_iter):
    np.random.seed(2)
    # each entry in bin_indices is an int array of indices into values
    # that belong in the corresponding bin.
    bin_indices = [np.where(idx==i)[0] for i in range(n_bins)]
    bin_sizes = [len(bi) for bi in bin_indices]
    output = [np.zeros(bs*n_iter) for bs in bin_sizes]
    
    for i in range(n_iter):
        values = np.random.random(n_vals) # simulate loading data
        for jbin, (indices, bsize) in enumerate(zip(bin_indices, bin_sizes)):
            output_chunk = values[indices]
            output[jbin][i*bsize:(i+1)*bsize] = output_chunk
    
    return output
    

@numba.njit
def _f_numba_chunk(i_chunk, idx, values, bin_sizes, bin_offsets, output_1):
    """Process one set of values (length n_vals).
    
    Update corresponding n_vals in output_1 array (length n_vals*n_iter).
    """
    # pointers to next entry for each bin, shape (n_bins,)
    j_bin_out = bin_offsets[:-1] + bin_sizes*i_chunk
    for i_bin, val in zip(idx, values):
        output_1[j_bin_out[i_bin]] = val
        j_bin_out[i_bin] += 1

def f_numba(n_iter):
    np.random.seed(2)
    bin_sizes, _ = np.histogram(idx, np.arange(n_bins+1)-0.5)
    bin_offsets = np.concatenate(([0], np.cumsum(bin_sizes)))*n_iter
    output_1 = np.empty(n_vals*n_iter)
    
    for i in range(n_iter):
        values = np.random.random(n_vals) # simulate loading data
        _f_numba_chunk(i, idx, values, bin_sizes, bin_offsets, output_1)
        
    # convert output_1 to list of arrays
    output = [
        output_1[bin_offsets[i]:bin_offsets[i+1]]
        for i in range(n_bins)
        ]
    
    return output

# test
out_ref = f_reference(5)
out_vec = f_vectorized(5)
out_numba = f_numba(5)


for oref, ovec, onum in zip(out_ref, out_vec, out_numba):
    assert np.all(ovec == oref)
    assert np.all(onum == oref)

С ускорением в 50 раз, возможно больше нет необходимости распараллеливать это, но часть внутри l oop for i in range(n_iter) может быть распараллелена с помощью multiprocessing.Pool; каждый рабочий возвращает output_chunk, а процессу верхнего уровня нужно только сохранить фрагменты в output.

...