Это кажется тривиально распараллеливаемым:
- У вас есть внешний цикл, который вы запускаете 90 раз.
- Каждый раз вы не изменяете какие-либо общие массивы, кроме
final_emit
- … и это только для сохранения в уникальную строку.
- Похоже, что большая часть работы внутри цикла - это бесполезные операции над массивом,который выпустит GIL.
Итак (используя futures
backport concurrent.futures
, так как вы, кажется, находитесь на 2.7):
import numpy as np
import time
import futures
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
def dostuff(i):
fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
# ...
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
return np.sum(to_bin_emit, axis=1)
with futures.ThreadPoolExecutor(max_workers=8) as x:
for i, row in enumerate(x.map(dostuff, xrange(division))):
final_emit[i] = row
Если это сработает, попробуйте две настройки, каждая из которых может быть более эффективной.Нам не важно, в каком порядке возвращаются результаты, но map
ставит их в очередь.Это может потратить немного времени и пространства.Я не думаю, что это будет иметь большое значение (по-видимому, подавляющее большинство вашего времени, по-видимому, тратится на выполнение вычислений, а не на запись результатов), но без профилирования вашего кода трудно быть уверенным.Таким образом, есть два простых способа решения этой проблемы.
Использование as_completed
позволяет нам использовать результаты в любом порядке, в котором они заканчиваются, а не в том порядке, в котором мы поставили их в очередь.Примерно так:
def dostuff(i):
fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
# ...
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
return i, np.sum(to_bin_emit, axis=1)
with futures.ThreadPoolExecutor(max_workers=8) as x:
fs = [x.submit(dostuff, i) for i in xrange(division))
for i, row in futures.as_completed(fs):
final_emit[i] = row
В качестве альтернативы, мы можем заставить функцию вставлять строки напрямую, а не возвращать их.Это означает, что мы теперь мутируем общий объект из нескольких потоков.Так что я думаю, что здесь нужна блокировка, хотя я не уверен (правила numpy немного сложны, и я не прочитал ваш код, который полностью…).Но это, вероятно, не повредит производительности, и это легко.Итак:
import numpy as np
import threading
# etc.
final_emit = np.zeros((division, division, freq_division))
final_emit_lock = threading.Lock()
def dostuff(i):
fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
# ...
to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
with final_emit_lock:
final_emit[i] = np.sum(to_bin_emit, axis=1)
with futures.ThreadPoolExecutor(max_workers=8) as x:
x.map(dostuff, xrange(division))
Что max_workers=8
во всех моих примерах должно быть настроено для вашей машины.Слишком много потоков - это плохо, потому что они начинают сражаться друг с другом, а не распараллеливаются;слишком мало потоков - еще хуже, потому что некоторые из ваших ядер просто простаивают.
Если вы хотите, чтобы это работало на разных машинах, вместо того, чтобы настраивать его для каждой, наилучшее предположение (для 2.7)обычно это:
import multiprocessing
# ...
with futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) as x:
Но если вы хотите выжать максимальную производительность из конкретной машины, вам следует протестировать разные значения.В частности, для типичного четырехъядерного ноутбука с гиперпоточностью идеальное значение может составлять от 4 до 8, в зависимости от конкретной работы, которую вы выполняете, и проще просто попробовать все значения, чем пытаться предсказать.