Размер куска не имеет отношения к мультипроцессору / pool.map в Python? - PullRequest
0 голосов
/ 14 ноября 2018

Я пытаюсь использовать функциональность пула для многопроцессорной обработки python.

Независимо от того, как я устанавливаю размер чанка (под Windows 7 и Ubuntu - последний см. Ниже с 4 ядрами), количество параллельных потоков кажется равнымоставайтесь прежними.

from multiprocessing import Pool
from multiprocessing import cpu_count
import multiprocessing
import time


def f(x):
    print("ready to sleep", x, multiprocessing.current_process())
    time.sleep(20)
    print("slept with:", x, multiprocessing.current_process())


if __name__ == '__main__':
    processes = cpu_count()
    print('-' * 20)
    print('Utilizing %d cores' % processes)
    print('-' * 20)
    pool = Pool(processes)
    myList = []
    runner = 0
    while runner < 40:
        myList.append(runner)
        runner += 1
    print("len(myList):", len(myList))

    # chunksize = int(len(myList) / processes)
    # chunksize = processes
    chunksize = 1
    print("chunksize:", chunksize)
    pool.map(f, myList, 1)

Поведение одинаково, использую ли я chunksize = int(len(myList) / processes), chunksize = processes или 1 (как в примере выше).

Может ли это бытьразмер куска автоматически устанавливается на количество ядер?

Пример для chunksize = 1:

--------------------
Utilizing 4 cores
--------------------
len(myList): 40
chunksize: 10
ready to sleep 0 <ForkProcess(ForkPoolWorker-1, started daemon)>
ready to sleep 1 <ForkProcess(ForkPoolWorker-2, started daemon)>
ready to sleep 2 <ForkProcess(ForkPoolWorker-3, started daemon)>
ready to sleep 3 <ForkProcess(ForkPoolWorker-4, started daemon)>
slept with: 0 <ForkProcess(ForkPoolWorker-1, started daemon)>
ready to sleep 4 <ForkProcess(ForkPoolWorker-1, started daemon)>
slept with: 1 <ForkProcess(ForkPoolWorker-2, started daemon)>
ready to sleep 5 <ForkProcess(ForkPoolWorker-2, started daemon)>
slept with: 2 <ForkProcess(ForkPoolWorker-3, started daemon)>
ready to sleep 6 <ForkProcess(ForkPoolWorker-3, started daemon)>
slept with: 3 <ForkProcess(ForkPoolWorker-4, started daemon)>
ready to sleep 7 <ForkProcess(ForkPoolWorker-4, started daemon)>
slept with: 4 <ForkProcess(ForkPoolWorker-1, started daemon)>
ready to sleep 8 <ForkProcess(ForkPoolWorker-1, started daemon)>
slept with: 5 <ForkProcess(ForkPoolWorker-2, started daemon)>
ready to sleep 9 <ForkProcess(ForkPoolWorker-2, started daemon)>
slept with: 6 <ForkProcess(ForkPoolWorker-3, started daemon)>
ready to sleep 10 <ForkProcess(ForkPoolWorker-3, started daemon)>
slept with: 7 <ForkProcess(ForkPoolWorker-4, started daemon)>
ready to sleep 11 <ForkProcess(ForkPoolWorker-4, started daemon)>
slept with: 8 <ForkProcess(ForkPoolWorker-1, started daemon)>

1 Ответ

0 голосов
/ 14 ноября 2018

Размер куска не влияет на то, сколько ядер используется, это задается параметром processes Pool.Chunksize устанавливает, сколько элементов итерируемого элемента, который вы передаете Pool.map, распределяется по одному рабочему процессу за один раз , что Pool называет "задачей" (на рисунке ниже показан Python 3.7.1).

task_python_3.7.1

Если вы установите chunksize=1, рабочий процесс получает новый элемент в новом задании только после выполнения одногополучил раньше.Для chunksize > 1 рабочий получает сразу целую партию предметов в рамках задачи, а когда она завершается, он получает следующую партию, если они еще остались.

Распределение предметов по одному с помощью chunksize=1увеличивает гибкость планирования при одновременном снижении общей пропускной способности, поскольку для капельной подачи требуется больше межпроцессного взаимодействия (IPC).

В моем углубленном анализе алгоритма chunksize-пула здесь я определяю единица работы для обработки одного элемента итерируемого как taskel , чтобы избежать конфликтов имен с использованием пула слова "задача".Задача (как единица работы) состоит из chunksize Taskels.

Вы бы установили chunksize=1, если не можете предсказать, как долго нужно будет выполнить Taskel, например, задача оптимизации, где время обработкисильно варьируется в зависимости от задач.Здесь капельное кормление не позволяет рабочему процессу сидеть на куче нетронутых предметов, в то же время хрустеть на одном тяжелом задании, не позволяя другим элементам в его задаче распределяться между рабочими процессами в режиме ожидания.

В противном случае, если всем вашим задачам потребуется одинаковое время для завершения, вы можете установить chunksize=len(iterable) // processes, чтобы задачи распределялись по всем работникам только один раз.Обратите внимание, что это создаст еще одну задачу, чем процессы (процессы + 1), если у len(iterable) / processes есть остаток.Это может серьезно повлиять на общее время вычислений.Подробнее об этом читайте в ранее связанном ответе.


К вашему сведению, это часть исходного кода, где Pool внутренне вычисляет размер фрагмента, если не установлен:

    # Python 3.6, line 378 in `multiprocessing.pool.py`
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0
...