Чем «batch_size» в joblib.Parallel отличается от «chunk_size» в multiprocessing.Pool.map? - PullRequest
0 голосов
/ 17 декабря 2018

Есть две вещи, которые мне не понятны относительно того, как работает опция "batch_size" в joblib.Следующий код может охватить мою проблему.

from multiprocessing import Pool
from joblib import Parallel, delayed
import time
import numpy as np
import sys

def doubler(number):
    time.sleep(0.01)
    return number * 2

def main(idx,num_proc):
    num_elements = 1000
    num_iter     = 2

    # multiprocessing    
    if idx == 0:
        x = np.arange(1,num_elements,1,int)
        print("Module: multiprocessing.Pool")
        chunk = int(num_elements/(4*num_proc))
        pool = Pool(processes=num_proc)
        for iter in range(num_iter):
            t1 = time.time()
            result = pool.map(doubler, x, chunk)
            print("num_proc =", num_proc, ", runtime = ", time.time() - t1)
        pool.close()    

    # joblib
    elif idx == 1:
        print("Module: joblib.Parallel")
#        chunk = int(num_elements/(4*num_proc))
        with Parallel(n_jobs=num_proc,backend='multiprocessing',batch_size='auto',prefer='processes',max_nbytes=None,verbose=10) as parallel:
#        with Parallel(n_jobs=num_proc,backend='multiprocessing',batch_size=chunk,pre_dispatch=chunk,prefer='processes',max_nbytes=None,verbose=20) as parallel:
            for iter in range(num_iter):    
                t1 = time.time()
                result = parallel(delayed(doubler)(x) for x in np.arange(1,num_elements,1,int))        
                print("num_proc =", num_proc, ", runtime = ", time.time() - t1)

if __name__ == '__main__':
    idx = 1
    num_proc = 2
    if len(sys.argv) > 1:
        idx = int(sys.argv[1])
        num_proc = int(sys.argv[2])
    main(idx,num_proc)

Во-первых, код с idx = 1 показывает роль batch_size в joblib.Когда я запускаю программу, значение по умолчанию должно быть pre_dispath = 4 и batch_size = 'auto' с начальным размером 1. Когда печатается подробное сообщение, оно начинается с batch_size = 1, а затем переключается на batch_size = 2 после 12Задачи, а затем batch_size = 26 после 20 задач, и в целом первый запуск занял 5,7 с.

Module: joblib.Parallel
[Parallel(n_jobs=2)]: Using backend MultiprocessingBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done   2 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done   3 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done   5 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done   6 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done   7 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done   8 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done   9 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Batch computation too fast (0.1690s.) Setting batch_size=2.
[Parallel(n_jobs=2)]: Done  10 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done  11 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done  12 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done  14 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Batch computation too fast (0.0301s.) Setting batch_size=26.
[Parallel(n_jobs=2)]: Done  16 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done  18 tasks      | elapsed:    0.5s
[Parallel(n_jobs=2)]: Done  20 tasks      | elapsed:    0.5s
[Parallel(n_jobs=2)]: Done  46 tasks      | elapsed:    0.7s
...
[Parallel(n_jobs=2)]: Done 999 out of 999 | elapsed:    5.6s finished
num_proc = 2 , runtime =  5.668241024017334

Теперь, повторно используя те же пулы, я снова запускаю ту же функцию.

[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    0.0s
[Parallel(n_jobs=2)]: Done  30 tasks      | elapsed:    0.2s
[Parallel(n_jobs=2)]: Done  56 tasks      | elapsed:    0.5s
[Parallel(n_jobs=2)]: Done  82 tasks      | elapsed:    0.8s
...
[Parallel(n_jobs=2)]: Done 999 tasks      | elapsed:   10.3s
[Parallel(n_jobs=2)]: Done 999 out of 999 | elapsed:   10.3s finished
num_proc = 2 , runtime =  10.353703498840332

Из сообщения ясно, что pre_dispatch = 4 и batch_size = 26. Эти 26 batch_size получены из предыдущего запуска программы.Однако второй прогон занял 10,4 с, что в два раза медленнее, чем первый прогон.

Мой первый вопрос: 1.Есть ли способ обойти эту проблему перезаписи "batch_size", кроме "НЕ" повторного использования пулов?


Во-вторых, я пытался понять любые издержки в joblib.Parallel (еслилюбой) по сравнению с мультипроцессингом.Для этого я изменил код, добавив

chunk = int(num_elements/(4*num_proc))
with Parallel(n_jobs=num_proc,backend='multiprocessing',batch_size=chunk,pre_dispatch=chunk,prefer='processes',max_nbytes=None,verbose=20) as parallel:

Насколько я понимаю (исправьте меня, если я ошибаюсь), что chunk_size по умолчанию в multirprocessing.Pool.map имеет значение num_elements / (4 * num_proc).Однако результаты показывают, что joblib.Parallel занимает 10,6 с, а multiprocessing.Pool - 5,4 с (вы можете проверить это, установив idx = 0).

Затем мой второй вопрос: 2.Чем «batch_size» в joblib отличается от «chunk_size» в многопроцессорной обработке?

(обратите внимание, что я использую python 3.6.5 и joblib 0.12.5)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...