Почему mp.Pool (). Map () медленнее, чем ProcessPoolExecutor (). Map () - PullRequest
0 голосов
/ 24 января 2020

У меня есть этот глупый код, чтобы перефразировать поведение, с которым я столкнулся на работе:

from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
from struct import pack
from time import time


def packer(integer):

    return pack('i', integer)


if __name__=='__main__':

    pool1 = Pool()
    pool2 = ProcessPoolExecutor()

    nums = list(range(10**4))

    start = time()
    res1 = pool1.map(packer, nums)
    print (f'total mp pool: {time() - start}')

    start = time()
    res2 = pool2.map(packer, nums)
    print (f'total futures pool: {time() - start}')

    pool1.close()

Я получаю (Python 3.8.1):

total mp pool: 0.1600649356842041
total futures pool: 0.9206013679504395

На работе я изменен код с mp.Pool() на concurrent.futures, чтобы разрешить перемещение между процессами и потоками.

Затем я обнаружил, что распространение исключений было забавным с concurrent.futures. Возвращаясь к mp.Pool(), увидел, что у меня снижение производительности.

Я понимаю, что concurrent.futures.ProcessPoolExecutor должен быть API более высокого уровня, как это быстрее, чем mp.Pool()?

Я вижу здесь , что ProcessPoolExecutor.map просто:

super().map(partial(_process_chunk, fn),
                              _get_chunks(*iterables, chunksize=chunksize),
                              timeout=timeout)

, где super равно _base.Executor :

def map(self, fn, *iterables, timeout=None, chunksize=1):
    """Returns an iterator equivalent to map(fn, iter).
    Args:
        fn: A callable that will take as many arguments as there are
            passed iterables.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        chunksize: The size of the chunks the iterable will be broken into
            before being passed to a child process. This argument is only
            used by ProcessPoolExecutor; it is ignored by
            ThreadPoolExecutor.
    Returns:
        An iterator equivalent to: map(func, *iterables) but the calls may
        be evaluated out-of-order.
    Raises:
        TimeoutError: If the entire result iterator could not be generated
            before the given timeout.
        Exception: If fn(*args) raises for any values.
    """
    if timeout is not None:
        end_time = timeout + time.monotonic()

    fs = [self.submit(fn, *args) for args in zip(*iterables)]

    # Yield must be hidden in closure so that the futures are submitted
    # before the first iterator value is required.
    def result_iterator():
        try:
            # reverse to keep finishing order
            fs.reverse()
            while fs:
                # Careful not to keep a reference to the popped future
                if timeout is None:
                    yield fs.pop().result()
                else:
                    yield fs.pop().result(end_time - time.monotonic())
        finally:
            for future in fs:
                future.cancel()
    return result_iterator()

Здесь я немного теряюсь.

Спускаются ли mp.Pool и ProcessPoolExecutor в разные кроличьи норы? Можно ли как-то получить «хорошие вещи» из ProcessPoolExecutor, вручную вызвав mp / Pool / map с правильными параметрами?

...