У меня есть этот глупый код, чтобы перефразировать поведение, с которым я столкнулся на работе:
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
from struct import pack
from time import time
def packer(integer):
return pack('i', integer)
if __name__=='__main__':
pool1 = Pool()
pool2 = ProcessPoolExecutor()
nums = list(range(10**4))
start = time()
res1 = pool1.map(packer, nums)
print (f'total mp pool: {time() - start}')
start = time()
res2 = pool2.map(packer, nums)
print (f'total futures pool: {time() - start}')
pool1.close()
Я получаю (Python 3.8.1):
total mp pool: 0.1600649356842041
total futures pool: 0.9206013679504395
На работе я изменен код с mp.Pool()
на concurrent.futures
, чтобы разрешить перемещение между процессами и потоками.
Затем я обнаружил, что распространение исключений было забавным с concurrent.futures
. Возвращаясь к mp.Pool()
, увидел, что у меня снижение производительности.
Я понимаю, что concurrent.futures.ProcessPoolExecutor
должен быть API более высокого уровня, как это быстрее, чем mp.Pool()
?
Я вижу здесь , что ProcessPoolExecutor.map
просто:
super().map(partial(_process_chunk, fn),
_get_chunks(*iterables, chunksize=chunksize),
timeout=timeout)
, где super
равно _base.Executor :
def map(self, fn, *iterables, timeout=None, chunksize=1):
"""Returns an iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
chunksize: The size of the chunks the iterable will be broken into
before being passed to a child process. This argument is only
used by ProcessPoolExecutor; it is ignored by
ThreadPoolExecutor.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
end_time = timeout + time.monotonic()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
try:
# reverse to keep finishing order
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result()
else:
yield fs.pop().result(end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
return result_iterator()
Здесь я немного теряюсь.
Спускаются ли mp.Pool
и ProcessPoolExecutor
в разные кроличьи норы? Можно ли как-то получить «хорошие вещи» из ProcessPoolExecutor
, вручную вызвав mp
/ Pool
/ map
с правильными параметрами?