Следует отметить, что каждый раз, когда вы сопоставляете values
с output
, для каждой ячейки добавляется одинаковое количество записей. Таким образом, вместо добавления в списки вы можете предварительно выделить numpy массивы с правильным размером.
Более того, вы можете хранить индексы для каждой ячейки в массивах (по одному массиву на ячейку). Поскольку вам нужно создать эти массивы индексов только один раз для всех итераций, и поскольку их гораздо меньше, чем значений, вы можете извлечь все новые записи из values
для одного конкретного контейнера, используя:
output_chunk = values[indices]
Еще быстрее использовать numba (он включен в дистрибутив Anaconda Python) для своевременной (jit) компиляции. Тогда вы получите производительность, аналогичную C, без проблем с компиляцией и компоновкой кода. Я не уверен во всех деталях оптимизации numba, но я думаю, что наиболее эффективно, если функция с пометкой @njit
использует только скалярные переменные и массивы numpy, а не Python списки и словари.
Вот время для трех реализаций (для меньшего размера массива, чем в вашем вопросе):
%timeit -r2 -n5 f_reference(100)
486 ms ± 4.42 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)
%timeit -r2 -n50 f_vectorized(100)
73.6 ms ± 219 µs per loop (mean ± std. dev. of 2 runs, 50 loops each)
%timeit -r2 -n50 f_numba(100)
9.6 ms ± 40.9 µs per loop (mean ± std. dev. of 2 runs, 50 loops each)
Вот реализация:
import numpy as np
import numba
np.random.seed(1)
n_vals = 10000 # 1e7
n_bins = 1000# 1e6
idx = np.random.randint(n_bins, size=n_vals)
def f_reference(n_iter):
np.random.seed(2)
output = [[] for i in range(n_bins)]
for i in range(n_iter):
values = np.random.random(n_vals) # simulate loading data
for j,value in enumerate(values):
output[idx[j]].append(value)
return [np.array(x) for x in output]
def f_vectorized(n_iter):
np.random.seed(2)
# each entry in bin_indices is an int array of indices into values
# that belong in the corresponding bin.
bin_indices = [np.where(idx==i)[0] for i in range(n_bins)]
bin_sizes = [len(bi) for bi in bin_indices]
output = [np.zeros(bs*n_iter) for bs in bin_sizes]
for i in range(n_iter):
values = np.random.random(n_vals) # simulate loading data
for jbin, (indices, bsize) in enumerate(zip(bin_indices, bin_sizes)):
output_chunk = values[indices]
output[jbin][i*bsize:(i+1)*bsize] = output_chunk
return output
@numba.njit
def _f_numba_chunk(i_chunk, idx, values, bin_sizes, bin_offsets, output_1):
"""Process one set of values (length n_vals).
Update corresponding n_vals in output_1 array (length n_vals*n_iter).
"""
# pointers to next entry for each bin, shape (n_bins,)
j_bin_out = bin_offsets[:-1] + bin_sizes*i_chunk
for i_bin, val in zip(idx, values):
output_1[j_bin_out[i_bin]] = val
j_bin_out[i_bin] += 1
def f_numba(n_iter):
np.random.seed(2)
bin_sizes, _ = np.histogram(idx, np.arange(n_bins+1)-0.5)
bin_offsets = np.concatenate(([0], np.cumsum(bin_sizes)))*n_iter
output_1 = np.empty(n_vals*n_iter)
for i in range(n_iter):
values = np.random.random(n_vals) # simulate loading data
_f_numba_chunk(i, idx, values, bin_sizes, bin_offsets, output_1)
# convert output_1 to list of arrays
output = [
output_1[bin_offsets[i]:bin_offsets[i+1]]
for i in range(n_bins)
]
return output
# test
out_ref = f_reference(5)
out_vec = f_vectorized(5)
out_numba = f_numba(5)
for oref, ovec, onum in zip(out_ref, out_vec, out_numba):
assert np.all(ovec == oref)
assert np.all(onum == oref)
С ускорением в 50 раз, возможно больше нет необходимости распараллеливать это, но часть внутри l oop for i in range(n_iter)
может быть распараллелена с помощью multiprocessing.Pool
; каждый рабочий возвращает output_chunk
, а процессу верхнего уровня нужно только сохранить фрагменты в output
.