Улучшение времени выполнения кода Python NumPy - PullRequest
0 голосов
/ 22 мая 2018

У меня есть код, который переназначает ячейки для большого массива numpy.В основном, элементы большого массива были выбраны с различной частотой, и конечная цель состоит в том, чтобы повторно объединить весь массив с фиксированными ячейками freq_bins.Код довольно медленный для моего массива.Есть ли хороший способ улучшить время выполнения этого кода?Фактор немногих подойдет на данный момент.Может быть какая-то numba магия сделает.

import numpy as np
import time
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
time1 = time.time()
for i in xrange(division):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    sky_by_cap = np.einsum('ij, jk->ijk', boost_factor[i],es)
    freq_index = np.digitize(fre_boost, freq_bins)
    freq_index_reshaped = freq_index.reshape(division*cd, -1)
    freq_index = None
    sky_by_cap_reshaped = sky_by_cap.reshape(freq_index_reshaped.shape)
    to_bin_emit = np.zeros(freq_index_reshaped.shape)
    row_index = np.arange(freq_index_reshaped.shape[0]).reshape(-1, 1)
    np.add.at(to_bin_emit, (row_index, freq_index_reshaped), sky_by_cap_reshaped)
    to_bin_emit = to_bin_emit.reshape(fre_boost.shape)
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    final_emit[i] = np.sum(to_bin_emit, axis=1)
print(time.time()-time1)

Ответы [ 3 ]

0 голосов
/ 23 мая 2018

Это кажется тривиально распараллеливаемым:

  • У вас есть внешний цикл, который вы запускаете 90 раз.
  • Каждый раз вы не изменяете какие-либо общие массивы, кромеfinal_emit
    • … и это только для сохранения в уникальную строку.
  • Похоже, что большая часть работы внутри цикла - это бесполезные операции над массивом,который выпустит GIL.

Итак (используя futures backport concurrent.futures, так как вы, кажется, находитесь на 2.7):

import numpy as np
import time
import futures

division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))

def dostuff(i):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    # ...
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    return np.sum(to_bin_emit, axis=1)

with futures.ThreadPoolExecutor(max_workers=8) as x:
    for i, row in enumerate(x.map(dostuff, xrange(division))):
        final_emit[i] = row

Если это сработает, попробуйте две настройки, каждая из которых может быть более эффективной.Нам не важно, в каком порядке возвращаются результаты, но map ставит их в очередь.Это может потратить немного времени и пространства.Я не думаю, что это будет иметь большое значение (по-видимому, подавляющее большинство вашего времени, по-видимому, тратится на выполнение вычислений, а не на запись результатов), но без профилирования вашего кода трудно быть уверенным.Таким образом, есть два простых способа решения этой проблемы.


Использование as_completed позволяет нам использовать результаты в любом порядке, в котором они заканчиваются, а не в том порядке, в котором мы поставили их в очередь.Примерно так:

def dostuff(i):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    # ...
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    return i, np.sum(to_bin_emit, axis=1)

with futures.ThreadPoolExecutor(max_workers=8) as x:
    fs = [x.submit(dostuff, i) for i in xrange(division))
    for i, row in futures.as_completed(fs): 
        final_emit[i] = row

В качестве альтернативы, мы можем заставить функцию вставлять строки напрямую, а не возвращать их.Это означает, что мы теперь мутируем общий объект из нескольких потоков.Так что я думаю, что здесь нужна блокировка, хотя я не уверен (правила numpy немного сложны, и я не прочитал ваш код, который полностью…).Но это, вероятно, не повредит производительности, и это легко.Итак:

import numpy as np
import threading
# etc.
final_emit = np.zeros((division, division, freq_division))
final_emit_lock = threading.Lock()

def dostuff(i):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    # ...
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    with final_emit_lock:
        final_emit[i] = np.sum(to_bin_emit, axis=1)

with futures.ThreadPoolExecutor(max_workers=8) as x:
    x.map(dostuff, xrange(division))

Что max_workers=8 во всех моих примерах должно быть настроено для вашей машины.Слишком много потоков - это плохо, потому что они начинают сражаться друг с другом, а не распараллеливаются;слишком мало потоков - еще хуже, потому что некоторые из ваших ядер просто простаивают.

Если вы хотите, чтобы это работало на разных машинах, вместо того, чтобы настраивать его для каждой, наилучшее предположение (для 2.7)обычно это:

import multiprocessing
# ...
with futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) as x:

Но если вы хотите выжать максимальную производительность из конкретной машины, вам следует протестировать разные значения.В частности, для типичного четырехъядерного ноутбука с гиперпоточностью идеальное значение может составлять от 4 до 8, в зависимости от конкретной работы, которую вы выполняете, и проще просто попробовать все значения, чем пытаться предсказать.

0 голосов
/ 23 мая 2018

Сохраняйте код простым и оптимизируйте его

Если у вас есть представление о том, какой алгоритм вы хотите кодировать, напишите простую справочную реализацию.Из этого вы можете пойти двумя путями, используя Python.Вы можете попробовать векторизовать код или , вы можете скомпилировать код для получения хорошей производительности.

Даже если бы np.einsum или np.add.at были реализованы в Numba, это было бы очень труднолюбой компилятор для создания эффективного бинарного кода из вашего примера.

Единственное, что я переписал, - это более эффективный подход оцифровки для скалярных значений.

Edit

В исходном коде Numba естьболее эффективное использование оцифровки для скалярных значений.

Код

#From Numba source
#Copyright (c) 2012, Anaconda, Inc.
#All rights reserved.

@nb.njit(fastmath=True)
def digitize(x, bins, right=False):
    # bins are monotonically-increasing
    n = len(bins)
    lo = 0
    hi = n

    if right:
        if np.isnan(x):
            # Find the first nan (i.e. the last from the end of bins,
            # since there shouldn't be many of them in practice)
            for i in range(n, 0, -1):
                if not np.isnan(bins[i - 1]):
                    return i
            return 0
        while hi > lo:
            mid = (lo + hi) >> 1
            if bins[mid] < x:
                # mid is too low => narrow to upper bins
                lo = mid + 1
            else:
                # mid is too high, or is a NaN => narrow to lower bins
                hi = mid
    else:
        if np.isnan(x):
            # NaNs end up in the last bin
            return n
        while hi > lo:
            mid = (lo + hi) >> 1
            if bins[mid] <= x:
                # mid is too low => narrow to upper bins
                lo = mid + 1
            else:
                # mid is too high, or is a NaN => narrow to lower bins
                hi = mid

    return lo

@nb.njit(fastmath=True)
def digitize(value, bins):
  if value<bins[0]:
    return 0

  if value>=bins[bins.shape[0]-1]:
    return bins.shape[0]

  for l in range(1,bins.shape[0]):
    if value>=bins[l-1] and value<bins[l]:
      return l

@nb.njit(fastmath=True,parallel=True)
def inner_loop(boost_factor,freq_bins,es):
  res=np.zeros((boost_factor.shape[0],freq_bins.shape[0]),dtype=np.float64)
  for i in nb.prange(boost_factor.shape[0]):
    for j in range(boost_factor.shape[1]):
      for k in range(freq_bins.shape[0]):
        ind=nb.int64(digitize(boost_factor[i,j]*freq_bins[k],freq_bins))
        res[i,ind]+=boost_factor[i,j]*es[j,k]*freq_bins[ind]
  return res

@nb.njit(fastmath=True)
def calc_nb(division,freq_division,cd,boost_factor,freq_bins,es):
  final_emit = np.empty((division, division, freq_division),np.float64)
  for i in range(division):
    final_emit[i,:,:]=inner_loop(boost_factor[i],freq_bins,es)
  return final_emit

Производительность

(Quadcore i7)
original_code: 118.5s
calc_nb: 4.14s
#with digitize implementation from Numba source
calc_nb: 2.66s
0 голосов
/ 22 мая 2018

Я думаю, что вы получите небольшое повышение производительности, заменив einsum фактическим умножением.

import numpy as np
import time
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
time1 = time.time()
for i in xrange(division):
    fre_boost = boost_factor[i][:, :, None]*freq_bins[None, None, :]
    sky_by_cap = boost_factor[i][:, :, None]*es[None, :, :]
    freq_index = np.digitize(fre_boost, freq_bins)
    freq_index_reshaped = freq_index.reshape(division*cd, -1)
    freq_index = None
    sky_by_cap_reshaped = sky_by_cap.reshape(freq_index_reshaped.shape)
    to_bin_emit = np.zeros(freq_index_reshaped.shape)
    row_index = np.arange(freq_index_reshaped.shape[0]).reshape(-1, 1)
    np.add.at(to_bin_emit, (row_index, freq_index_reshaped), sky_by_cap_reshaped)
    to_bin_emit = to_bin_emit.reshape(fre_boost.shape)
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    final_emit[i] = np.sum(to_bin_emit, axis=1)
print(time.time()-time1)

Ваш код довольно медленный на np.add.at, который, я считаю, может быть намного быстрее с np.bincount, хотя я не мог заставить его работать на многомерных массивах, которые у вас есть.Может быть, кто-то здесь может добавить к этому.

...