Плохое масштабирование многопроцессорной обработки Pool.map () в списке больших объектов: как добиться лучшего параллельного масштабирования в python? - PullRequest
6 голосов
/ 08 февраля 2020

Давайте определим:

from multiprocessing import Pool
import numpy as np
def func(x):
    for i in range(1000):
        i**2
    return 1

Обратите внимание, что func() что-то делает и всегда возвращает небольшое число 1.

Затем я сравниваю 8-ядерную параллель Pool.map() v / sa серийный, python встроенный, map()

n=10**3
a=np.random.random(n).tolist()

with Pool(8) as p:
    %timeit -r1 -n2  p.map(func,a)
%timeit -r1 -n2  list(map(func,a))

Это дает:

38.4 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
200 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)

, что показывает довольно хорошее параллельное масштабирование. Поскольку я использую 8 ядер, а 38.3 [ms] составляет примерно 1/8 от 200[s]

Тогда давайте попробуем Pool.map() в списках некоторых более важных вещей, для простоты я использую список списков таким образом:

n=10**3
m=10**4
a=np.random.random((n,m)).tolist()

with Pool(8) as p:
    %timeit -r1 -n2  p.map(func,a)
%timeit -r1 -n2  list(map(func,a))

, что дает:

292 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
209 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)

Видите ли, параллельное масштабирование исчезло! 1 с ~ 1,76 с

Мы можем сделать это еще хуже, попробуйте сделать каждый подсписок еще больше:

n=10**3
m=10**5
a=np.random.random((n,m)).tolist()

with Pool(8) as p:
    %timeit -r1 -n2  p.map(func,a)
%timeit -r1 -n2  list(map(func,a))

Это дает:

3.29 s ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
179 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)

Ничего себе, с еще большими подсписками, результат синхронизации полностью противоположен. Мы используем 8 ядер, чтобы получить в 20 раз медленнее время !!

Вы также можете заметить, что время серийного map() не имеет никакого отношения к размеру подсписка. Поэтому разумным объяснением было бы то, что Pool.map() действительно передает содержимое этого большого подсписка вокруг процессов, вызывающих дополнительную копию?

Я не уверен. Но если так, то почему он не передает адрес подсписка? В конце концов, подсписок уже находится в памяти, и на практике func(), который я использовал, гарантированно не изменяет / модифицирует подсписок.

Итак, в python, Как правильно сохранить параллельное масштабирование при отображении некоторых операций в список больших вещей?

Ответы [ 2 ]

7 голосов
/ 27 февраля 2020

Прежде чем мы начнем
и погрузимся глубже в любую охоту на наносекунды (и правильно, это скоро начнется, так как каждый [ns] имеет значение, так как масштабирование открывает весь ящик Пандоры проблемы), давайте договоримся о масштабах - самые простые и часто "дешевые" преждевременные уловки могут и часто разрушат ваши мечты, как только масштабы проблемы станут реалистичными * Масштаб 1290 * - тысячи (замеченные выше в обоих итераторах) ведут себя по-разному для вычислений в кеше с < 0.5 [ns] выборками данных, чем однажды превысили L1 / L2 / L3-размеры кеша для масштабов выше 1E+5, 1E+6, 1E+9, выше [GB] с, где каждый неправильный выбор выборка НАМНОГО ДОРОГА, чем несколько 100 [ns]

Q : "... поскольку у меня 8 ядер, Я хочу использовать их, чтобы получить в 8 раз быстрее"

I wi sh Вы могли , действительно . И все же, извините за правду, Мир не работает таким образом.

Посмотрите на этот интерактивный инструмент , он покажет вам оба ограничения скорости и их принципиальная зависимость от фактических производственных затрат при реальном масштабировании исходной задачи, поскольку она вырастает из тривиальных размеров, и эти совокупные эффекты в масштабе просто нажмите -it и играйте с ползунками, чтобы увидеть его вживую, в действии :

Animated UI-interactive Law of Diminishing Returns, re-formulated so as to also contain processing add-on setup / termination overheads

Q : (is) Pool.map() действительно передача содержимого этого большого подсписка вокруг процессов, вызывающих дополнительную копию?

Да,
он должен сделать это по замыслу
плюс он делает это , передавая все эти данные "через" еще "дорого « SER / DES обработка ,
для того, чтобы это произошло, доставлено « там ».
То же самое будет применяться всегда, когда Вы бы попытались вернуть "назад" некоторые результаты размером с мастодонты, чего вы не сделали, здесь выше.

Q : Но если это так, почему он не передает адрес подсписка?

Поскольку удаленный процесс (получение параметров) - это другой, полностью автономный процесс, с его собственным, отдельным и защищенным адресным пространством мы не можем просто передать адрес-ссылку"в", и мы хотели, чтобы это было полностью независимым, автономно работающим python процесс (из-за желания использовать этот трюк, чтобы сбежать от GIL-lock dance ), не так ли? Конечно, мы это сделали - это центральный шаг нашего побега из GIL-Wars (для лучшего понимания плюсов и минусов GIL-Lock, может понравиться this и this (стр.15 + при обработке с привязкой к процессору).

             0.1 ns - NOP
             0.3 ns - XOR, ADD, SUB
             0.5 ns - CPU L1 dCACHE reference           (1st introduced in late 80-ies )
             0.9 ns - JMP SHORT
             1   ns - speed-of-light (a photon) travel a 1 ft (30.5cm) distance -- will stay, throughout any foreseeable future :o)
?~~~~~~~~~~~ 1   ns - MUL ( i**2 = MUL i, i )~~~~~~~~~ doing this 1,000 x is 1 [us]; 1,000,000 x is 1 [ms]; 1,000,000,000 x is 1 [s] ~~~~~~~~~~~~~~~~~~~~~~~~~
           3~4   ns - CPU L2  CACHE reference           (2020/Q1)
             5   ns - CPU L1 iCACHE Branch mispredict
             7   ns - CPU L2  CACHE reference
            10   ns - DIV
            19   ns - CPU L3  CACHE reference           (2020/Q1 considered slow on 28c Skylake)
            71   ns - CPU cross-QPI/NUMA best  case on XEON E5-46*
           100   ns - MUTEX lock/unlock
           100   ns - own DDR MEMORY reference
           135   ns - CPU cross-QPI/NUMA best  case on XEON E7-*
           202   ns - CPU cross-QPI/NUMA worst case on XEON E7-*
           325   ns - CPU cross-QPI/NUMA worst case on XEON E5-46*
        10,000   ns - Compress 1K bytes with a Zippy PROCESS
        20,000   ns - Send     2K bytes over 1 Gbps  NETWORK
       250,000   ns - Read   1 MB sequentially from  MEMORY
       500,000   ns - Round trip within a same DataCenter
?~~~ 2,500,000   ns - Read  10 MB sequentially from  MEMORY~~(about an empty python process to copy on spawn)~~~~ x ( 1 + nProcesses ) on spawned process instantiation(s), yet an empty python interpreter is indeed not a real-world, production-grade use-case, is it?
    10,000,000   ns - DISK seek
    10,000,000   ns - Read   1 MB sequentially from  NETWORK
?~~ 25,000,000   ns - Read 100 MB sequentially from  MEMORY~~(somewhat light python process to copy on spawn)~~~~ x ( 1 + nProcesses ) on spawned process instantiation(s)
    30,000,000   ns - Read 1 MB sequentially from a  DISK
?~~ 36,000,000   ns - Pickle.dump() SER a 10 MB object for IPC-transfer and remote DES in spawned process~~~~~~~~ x ( 2 ) for a single 10MB parameter-payload SER/DES + add an IPC-transport costs thereof or NETWORK-grade transport costs, if going into [distributed-computing] model Cluster ecosystem
   150,000,000   ns - Send a NETWORK packet CA -> Netherlands
  |   |   |   |
  |   |   | ns|
  |   | us|
  | ms|

Q : "что такое правильный способ поддерживать параллель масштабировать при параллельном отображении некоторых операций в списке больших вещей? "

enter image description here

A )
ПОНЯТЬ СПОСОБЫ ИЗБЕЖАТЬ ИЛИ МЕНЬШЕ СНИЖАТЬ РАСХОДЫ :

Понимать все типы затрат , которые вы должны заплатить и будет платить :

  • тратить как можно меньше процесс создание экземпляра расходы насколько это возможно ( скорее дорого) лучше всего как единовременная стоимость

    В macOS метод запуска spawn теперь по умолчанию. Метод запуска fork следует считать небезопасным, поскольку он может привести к сбоям подпроцесса. См. bpo-33725 .

  • расходы на передачу параметров настолько малы, насколько это необходимо (да Лучше избегайте повторной передачи этих « больших вещей » в качестве параметров)

  • никогда не трать ресурсы на вещи, которые не выполняют вашу работу - (никогда не порождают больше процессов, чем было сообщено len( os.sched_getaffinity( 0 ) ) - любой процесс, превышающий это, будет ожидать следующего слота CPU-core и будет вытеснять другой, эффективный для кэширования процесс, таким образом, переплачивая все затраты на извлечение после того, как они уже оплачены, для повторной выборки всех данных, чтобы вернуть их обратно в кэш-память, чтобы в скором времени их снова выселили в кэш-память. вычисления, в то время как те процессы, которые до сих пор работали таким образом, были правильно вытеснены (для чего?) наивным использованием целых multiprocessing.cpu_count() -отмеченных процессов, столь дорогостоящих при первоначальном Pool -создании)
  • лучше повторно использовать предварительно выделенную память, чем продолжать тратить ad-ho c затраты на выделение памяти ALAP
  • никогда не делятся, если цель - производительность
  • никогда не блокировать, никогда - будь то python gc, который может блокироваться, если его не избежать, или Pool.map(), который блокирует либо

B )
ПОД НАЖМИТЕ ПУТИ ПОВЫШЕНИЯ ЭФФЕКТИВНОСТИ :

Поймите все приемы повышения эффективности, даже за счет сложности кода (несколько SLO C -с легко показать в школьных учебниках, однако жертвуя как эффективностью, так и производительностью - несмотря на то, что оба они являются вашим главным врагом в борьбе за устойчивую производительность на протяжении масштабирования (либо размера проблемы или глубины итерации, либо при одновременном росте их обоих).

Некоторые категории реальных затрат с A ) резко изменили пределы теоретически достижимых ускорений, ожидаемых от перехода в некую форму [PARALLEL] оркестровок процессов (здесь выполнение некоторых частей кода выполняется в порожденных подпроцессах), первоначальное представление которых сначала было сформулировано Доктор Джин Амдаль еще в 60+ лет go (для которого недавно были добавлены два основных расширения обоих Связанные с экземпляром (-ами) процессы настройка + завершение добавление затрат (чрезвычайно важно в py2 всегда & py3.5 + для MacOS и Windows) и atomicity-of-work, о которых пойдет речь ниже.

Пересмотр строгой переформулировки закона Амдала S:

S   = speedup which can be achieved with N processors
s   = a proportion of a calculation, which is [SERIAL]
1-s = a parallelizable portion, that may run  [PAR]
N   = a number of processors ( CPU-cores ) actively participating on [PAR] processing

               1
S =  __________________________; where s, ( 1 - s ), N were defined above
                ( 1 - s )            pSO:= [PAR]-Setup-Overhead     add-on cost/latency
     s  + pSO + _________ + pTO      pTO:= [PAR]-Terminate-Overhead add-on cost/latency
                    N               

Пересмотр строгой и ресурсов- переосмысление:

                           1                         where s, ( 1 - s ), N
S =  ______________________________________________ ;      pSO, pTO
                   | ( 1 - s )             |               were defined above
     s  + pSO + max|  _________ , atomicP  |  + pTO        atomicP:= a unit of work,
                   |     N                 |                         further indivisible,
                                                                     a duration of an
                                                                     atomic-process-block

Прототип на целевом устройстве ЦП / ОЗУ с вашим python, масштабированный >> 1E+6

Любой упрощенный пример макета будет каким-то образом исказите ваши ожидания о том, как фактические рабочие нагрузки будут выполняться в естественных условиях. Недооцененные объемы ОЗУ, которые не видны в небольших масштабах, могут позже удивить масштабом, иногда даже приводя к тому, что операционная система переходит в состояние медленного sh, обменивая и перебивая. Некоторые более умные инструменты (numba.jit()) могут даже анализировать код и сокращать некоторые отрывки кода, которые никогда не будут посещены или не принесут никакого результата, поэтому предупреждаем, что упрощенные примеры могут привести к неожиданным наблюдениям.

from multiprocessing import Pool
import numpy as np
import os

SCALE = int( 1E9 )
STEP  = int( 1E1 )
aLIST = np.random.random( ( 10**3, 10**4 ) ).tolist()

#######################################################################################
#   func() does some SCALE'd amount of work, yet
#                                                passes almost zero bytes as parameters
#                                                allocates nothing, but iterator
#                                                returns one byte,
#                                                invariant to any expensive inputs
def func( x ):  
    for i in range( SCALE ):
        i**2
    return 1

Несколько советов о том, как сделать стратегию масштабирования менее затратной на накладные расходы:

#####################################################################################
#   more_work_en_block() wraps some SCALE'd amount of work, sub-list specified
def more_work_en_block( en_block = [ None, ] ):
    return [ func( nth_item ) for nth_item in en_block ]

Если действительно нужно пропустить большой список, лучше пропустить больший блок с удаленным итерации его частей (вместо того, чтобы оплачивать затраты на передачу для каждого и того же элемента, прошедшего много раз больше, чем при использовании sub_blocks (параметры обрабатывают SER / DES (~ затраты pickle.dumps() + pickle.loads()) [на каждый вызов], опять же, с дополнительными затратами, которые снижают результирующую эффективность и ухудшают накладные расходы в расширенном, строгом законе Амдала)

#####################################################################################
#   some_work_en_block() wraps some SCALE'd amount of work, tuple-specified
def some_work_en_block( sub_block = ( [ None, ], 0, 1 ) ):
    return more_work_en_block( en_block = sub_block[0][sub_block[1]:sub_block[2]] )

Правильное определение размера количество экземпляров процесса:

aMaxNumOfProcessesThatMakesSenseToSPAWN = len( os.sched_getaffinity( 0 ) ) # never more

with Pool( aMaxNumOfProcessesThatMakesSenseToSPAWN ) as p:
     p.imap_unordered( more_work_en_block, [ ( aLIST,
                                               start,
                                               start + STEP
                                               )
                                           for start in range( 0, len( aLIST ), STEP ) ] )

И последнее, но не менее важное: ожидайте огромного повышения производительности от умного использования numpy интеллектуального векторизованного кода, лучше всего без повторного прохождения stati c, предварительно скопированных (во время реализации процесса), таким образом, оплачиваемых как разумные масштабированные, в данном случае неизбежные, их стоимость) BLOB-объекты, используемые в коде без передачи тех же данных посредством передачи параметров в векторизованном (очень эффективном для ЦП) виде в качестве данных только для чтения. Некоторые примеры того, как один может сделать ~ +500 x ускорение можно прочитать здесь или здесь , примерно но ~ +400 x ускорение или около случай примерно ~ +100 x ускорения , с некоторыми примерами некоторой проблемной изоляции сценария тестирования ios.

В любом случае, чем ближе будет издеваться чем больше будет соответствовать вашим фактическим рабочим нагрузкам, тем больше смысла будут иметь тесты (в масштабе и в производстве).

7 голосов
/ 24 февраля 2020

ваша рабочая функция заканчивается слишком рано:

In [2]: %timeit func(1)
335 µs ± 12.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

, поэтому вы в основном измеряете издержки многопроцессорной обработки.

измените свою рабочую функцию, чтобы выполнять больше работы, например l oop 1000 * 1000 раз, а не 1000 раз, вы увидите, что он снова масштабируется, 1000000 петли стоят примерно 0.4s на моей маме c, что достаточно высоко по сравнению с накладными расходами.

ниже результат теста для различных n на моей ма c, я использую Pool(4), так как у меня 4 ядра, тест выполняется только один раз, а не несколько раз, как %timeit, потому что разница незначительна:

speedup graph

Вы могли видеть, что коэффициент ускорения увеличивается пропорционально n, издержки многопроцессорной обработки распределяются при каждом вызове рабочей функции.

математика позади, предположим, что накладные расходы на вызов равны:

ratio = {time_{single} \over time_{mp}} = {cost_{work} * n \over {{cost_{work} * n \over p_{cores}} + cost_{overhead} * n}} = {1 \over {\dfrac{1}{p_{cores}} + {cost_{overhead} \over cost_{work}} }}

, если мы хотим ratio > 1:

image {cost_{overhead} \over cost_{work}} }">

приблизительно равно:

enter image description here

, что означает, если рабочая функция выполняется слишком быстро сравнивается с накладными расходами на вызов, multiprocessing не масштабируется.

...