Python multiprocessing.Pool.map поведение, когда список длиннее, чем число процессов - PullRequest
0 голосов
/ 08 марта 2020

При отправке списка задач, длина которого превышает количество процессов, как процессы назначаются этим задачам?

from multiprocessing import Pool

def f(i):
    print(i)
    return i

with Pool(2) as pool:
    print(pool.map(f, [1, 2, 3, 4, 5]))

Я выполняю более сложную функцию, а выполнение не выполняется вроде бы в порядке (FIFO).

Ответы [ 2 ]

1 голос
/ 08 марта 2020

Класс Pool представляет пул рабочих процессов. Он запускает новый процесс, как только один из существующих процессов завершается. Чтобы лучше понять, мы устанавливаем chunksize=1, рассмотрим код,

from multiprocessing import Pool
from time import sleep


def f(x):
    print(f"Task {x} enter")
    sleep(5)
    print(f"Task {x} exit")
    return x * x


if __name__ == '__main__':
    with Pool(2) as pool:
        print(pool.map(f, range(10), chunksize=1))

Итак, порядок выполнения будет,

Task 0 enter
Task 1 enter
Task 0 exit
Task 2 enter
Task 1 exit
Task 3 enter
Task 2 exit
Task 4 enter
Task 3 exit
Task 5 enter
Task 4 exit
Task 6 enter
Task 5 exit
Task 7 enter
Task 6 exit
Task 8 enter
Task 7 exit
Task 9 enter
Task 8 exit
Task 9 exit
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
1 голос
/ 08 марта 2020

Вот пример кода:

from multiprocessing import Pool
from time import sleep


def f(x):
    print(x)
    sleep(0.1)
    return x * x


if __name__ == '__main__':
    with Pool(2) as pool:
        print(pool.map(f, range(100)))

Который распечатывает:

0
13
1
14
2
15
3
16
4
...

Если мы посмотрим на соответствующий исходный код в multiprocessing:

    def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
            error_callback=None):
        '''
        Helper function to implement map, starmap and their async counterparts.
        '''
        self._check_running()
        if not hasattr(iterable, '__len__'):
            iterable = list(iterable)

        if chunksize is None:
            chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
            if extra:
                chunksize += 1
        if len(iterable) == 0:
            chunksize = 0

        task_batches = Pool._get_tasks(func, iterable, chunksize)

Здесь мы имеем len(iterable) == 100, len(self._pool) * 4 == 8, поэтому chunksize, extra = 12, 4, что приводит к chunksize = 13, следовательно, вывод показывает, что задачи разбиты на пакеты по 13.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...