Есть две вещи, которые мне не понятны относительно того, как работает опция "batch_size" в joblib.Следующий код может охватить мою проблему.
from multiprocessing import Pool
from joblib import Parallel, delayed
import time
import numpy as np
import sys
def doubler(number):
time.sleep(0.01)
return number * 2
def main(idx,num_proc):
num_elements = 1000
num_iter = 2
# multiprocessing
if idx == 0:
x = np.arange(1,num_elements,1,int)
print("Module: multiprocessing.Pool")
chunk = int(num_elements/(4*num_proc))
pool = Pool(processes=num_proc)
for iter in range(num_iter):
t1 = time.time()
result = pool.map(doubler, x, chunk)
print("num_proc =", num_proc, ", runtime = ", time.time() - t1)
pool.close()
# joblib
elif idx == 1:
print("Module: joblib.Parallel")
# chunk = int(num_elements/(4*num_proc))
with Parallel(n_jobs=num_proc,backend='multiprocessing',batch_size='auto',prefer='processes',max_nbytes=None,verbose=10) as parallel:
# with Parallel(n_jobs=num_proc,backend='multiprocessing',batch_size=chunk,pre_dispatch=chunk,prefer='processes',max_nbytes=None,verbose=20) as parallel:
for iter in range(num_iter):
t1 = time.time()
result = parallel(delayed(doubler)(x) for x in np.arange(1,num_elements,1,int))
print("num_proc =", num_proc, ", runtime = ", time.time() - t1)
if __name__ == '__main__':
idx = 1
num_proc = 2
if len(sys.argv) > 1:
idx = int(sys.argv[1])
num_proc = int(sys.argv[2])
main(idx,num_proc)
Во-первых, код с idx = 1 показывает роль batch_size в joblib.Когда я запускаю программу, значение по умолчанию должно быть pre_dispath = 4 и batch_size = 'auto' с начальным размером 1. Когда печатается подробное сообщение, оно начинается с batch_size = 1, а затем переключается на batch_size = 2 после 12Задачи, а затем batch_size = 26 после 20 задач, и в целом первый запуск занял 5,7 с.
Module: joblib.Parallel
[Parallel(n_jobs=2)]: Using backend MultiprocessingBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 0.4s
[Parallel(n_jobs=2)]: Done 2 tasks | elapsed: 0.4s
[Parallel(n_jobs=2)]: Done 3 tasks | elapsed: 0.4s
[Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 0.4s
[Parallel(n_jobs=2)]: Done 5 tasks | elapsed: 0.4s
[Parallel(n_jobs=2)]: Done 6 tasks | elapsed: 0.4s
[Parallel(n_jobs=2)]: Done 7 tasks | elapsed: 0.4s
[Parallel(n_jobs=2)]: Done 8 tasks | elapsed: 0.4s
[Parallel(n_jobs=2)]: Done 9 tasks | elapsed: 0.4s
[Parallel(n_jobs=2)]: Batch computation too fast (0.1690s.) Setting batch_size=2.
[Parallel(n_jobs=2)]: Done 10 tasks | elapsed: 0.4s
[Parallel(n_jobs=2)]: Done 11 tasks | elapsed: 0.4s
[Parallel(n_jobs=2)]: Done 12 tasks | elapsed: 0.4s
[Parallel(n_jobs=2)]: Done 14 tasks | elapsed: 0.4s
[Parallel(n_jobs=2)]: Batch computation too fast (0.0301s.) Setting batch_size=26.
[Parallel(n_jobs=2)]: Done 16 tasks | elapsed: 0.4s
[Parallel(n_jobs=2)]: Done 18 tasks | elapsed: 0.5s
[Parallel(n_jobs=2)]: Done 20 tasks | elapsed: 0.5s
[Parallel(n_jobs=2)]: Done 46 tasks | elapsed: 0.7s
...
[Parallel(n_jobs=2)]: Done 999 out of 999 | elapsed: 5.6s finished
num_proc = 2 , runtime = 5.668241024017334
Теперь, повторно используя те же пулы, я снова запускаю ту же функцию.
[Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 0.0s
[Parallel(n_jobs=2)]: Done 30 tasks | elapsed: 0.2s
[Parallel(n_jobs=2)]: Done 56 tasks | elapsed: 0.5s
[Parallel(n_jobs=2)]: Done 82 tasks | elapsed: 0.8s
...
[Parallel(n_jobs=2)]: Done 999 tasks | elapsed: 10.3s
[Parallel(n_jobs=2)]: Done 999 out of 999 | elapsed: 10.3s finished
num_proc = 2 , runtime = 10.353703498840332
Из сообщения ясно, что pre_dispatch = 4 и batch_size = 26. Эти 26 batch_size получены из предыдущего запуска программы.Однако второй прогон занял 10,4 с, что в два раза медленнее, чем первый прогон.
Мой первый вопрос: 1.Есть ли способ обойти эту проблему перезаписи "batch_size", кроме "НЕ" повторного использования пулов?
Во-вторых, я пытался понять любые издержки в joblib.Parallel (еслилюбой) по сравнению с мультипроцессингом.Для этого я изменил код, добавив
chunk = int(num_elements/(4*num_proc))
with Parallel(n_jobs=num_proc,backend='multiprocessing',batch_size=chunk,pre_dispatch=chunk,prefer='processes',max_nbytes=None,verbose=20) as parallel:
Насколько я понимаю (исправьте меня, если я ошибаюсь), что chunk_size по умолчанию в multirprocessing.Pool.map имеет значение num_elements / (4 * num_proc).Однако результаты показывают, что joblib.Parallel занимает 10,6 с, а multiprocessing.Pool - 5,4 с (вы можете проверить это, установив idx = 0).
Затем мой второй вопрос: 2.Чем «batch_size» в joblib отличается от «chunk_size» в многопроцессорной обработке?
(обратите внимание, что я использую python 3.6.5 и joblib 0.12.5)