Многопроцессорная обработка Python: понимание логики `chunksize` - PullRequest
0 голосов
/ 12 декабря 2018

Какие факторы определяют оптимальный chunksize аргумент для таких методов, как multiprocessing.Pool.map()?Метод .map(), кажется, использует произвольную эвристику для размера по умолчанию (объяснено ниже);что мотивирует этот выбор и существует ли более вдумчивый подход, основанный на определенной ситуации / установке?

Пример - скажем, что я:

  • Передача iterable в .map()~ 15 миллионов элементов;
  • Работа на машине с 24 ядрами и использование по умолчанию processes = os.cpu_count() в пределах multiprocessing.Pool().

Мое наивное мышлениедать каждому из 24 работников одинаковый размер, то есть 15_000_000 / 24 или 625 000.Большие куски должны уменьшить текучесть кадров / накладные расходы при полном использовании всех работников.Но, похоже, что здесь отсутствуют некоторые потенциальные недостатки предоставления больших партий каждому работнику.Это неполная картина, и что я упускаю?


Часть моего вопроса проистекает из логики по умолчанию, если chunksize=None: оба .map() и .starmap() вызывают .map_async(), который выглядит так:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
               error_callback=None):
    # ... (materialize `iterable` to list if it's an iterator)
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)  # ????
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0

Какая логика стоит за divmod(len(iterable), len(self._pool) * 4)?Это означает, что размер фрагмента будет ближе к 15_000_000 / (24 * 4) == 156_250.Каково намерение умножить len(self._pool) на 4?

Это делает полученный размер фрагмента в 4 раз меньше, чем моя "наивная логика" сверху, которая состоит из простого деления длины итерируемого числа рабочих в pool._pool.

Наконец, есть еще этот фрагмент из документации по Python на .imap(), который еще больше пробуждает мое любопытство:

Аргумент chunksize такой жекак тот, который используется методом map().Для очень длинных итераций использование большого значения для chunksize может сделать выполнение задания намного быстрее, чем использование значения по умолчанию, равного 1.


Связанный ответэто полезно, но слишком высокоуровнево: Многопроцессорность Python: почему большие куски медленнее? .

Ответы [ 3 ]

0 голосов
/ 04 января 2019

Краткий ответ

Алгоритм пула chunksize - это эвристический алгоритм.Он предоставляет простое решение для всех возможных проблемных сценариев, которые вы пытаетесь внедрить в методы пула.Как следствие, он не может быть оптимизирован для любого конкретного сценария.

Алгоритм произвольно делит итерируемое примерно на четыре раза больше фрагментов, чем наивный подход.Чем больше блоков, тем больше накладных расходов, но повышается гибкость планирования.Как этот ответ покажет, это приводит к более высокой загрузке работника в среднем, но без гарантии более короткого общего времени вычислений для каждого случая.

«Приятно знать, - подумаете вы, - но как знание этого помогает мне в моих конкретных проблемах с многопроцессорностью?»Ну, это не так.Более честный короткий ответ: «короткого ответа нет», «многопроцессорная обработка сложна» и «это зависит».Наблюдаемый симптом может иметь разные корни даже для сходных сценариев.

Этот ответ пытается дать вам основные понятия, помогающие вам получить более четкое представление о планируемом черном ящике пула.Он также пытается дать вам некоторые базовые инструменты для распознавания и предотвращения потенциальных обрывов, поскольку они связаны с размером фрагмента.

Содержание

ЧастьI

  1. Определения
  2. Цели распараллеливания
  3. Сценарии распараллеливания
  4. Риски Chunksize> 1
  5. Алгоритм размера пула
  6. Количественная эффективность алгоритма

    6.1 Модели

    6.2 Параллельное расписание

    6.3 Эффективности

    6.3.1 Абсолютная эффективность распределения (ADE)

    6.3.2 Относительная эффективность распределения (RDE)

Часть II

Наивный и Алгоритм Chunksize-пула Проверка реальности Заключение

Сначала необходимо уточнить некоторые важные термины.


1.Определения


Чанк

Чанк здесь является частью iterable -аргумента, указанного в вызове метода пула.Как вычисляется размер фрагмента и какие последствия это может иметь, - тема этого ответа.


Задача

Физическое представление задачи в рабочем процессес точки зрения данных можно увидеть на рисунке ниже.

figure0

На рисунке показан пример вызова pool.map(), отображаемого вдоль строки кода, взятой из функции multiprocessing.pool.worker, где заданиепрочитанное с inqueue распаковывается.worker - основная функция main в MainThread пула-рабочего процесса.Аргумент func, указанный в методе пула, будет совпадать только с переменной func внутри функции worker для методов с одним вызовом, таких как apply_async и для imap с chunksize=1.Для остальных методов пула с параметром chunksize функция обработки func будет функцией отображения (mapstar или starmapstar).Эта функция отображает указанный пользователем func -параметр для каждого элемента передаваемого фрагмента итерируемого (-> "map-tasks").Время, которое требуется, определяет задачу также как единицу работы .


Taskel

В то время как использование слова "задача" для обработки целого одного фрагмента соответствует коду внутри multiprocessing.pool, нет указания, как следует ссылаться на одиночный вызов на указанный пользователем func, с одним элементом блока в качестве аргумента (ов).Чтобы избежать путаницы, возникающей из-за конфликтов имен (подумайте о maxtasksperchild -параметре для __init__ -метода пула), этот ответ будет относиться к отдельным единицам работы внутри задачи как taskel .

A taskel (из task + el ement) - это наименьшая единица работы в task .Это однократное выполнение функции, указанной с помощью func -параметра Pool -метода, вызываемой с аргументами, полученными из одного элемента переданного чанка .Задача состоит из chunksize Taskels .


Затраты на параллелизацию (PO)

PO состоит из внутренних издержек Python и служебных данных для межпроцессного взаимодействия (IPC).Служебная нагрузка для каждой задачи в Python поставляется с кодом, необходимым для упаковки и распаковки задач и их результатов.Служебная нагрузка IPC сопровождается необходимой синхронизацией потоков и копированием данных между различными адресными пространствами (требуется два шага копирования: parent -> queue -> child).Объем накладных расходов IPC зависит от операционной системы, аппаратного обеспечения и размера данных, что затрудняет обобщение воздействия.


2.Цели распараллеливания

При использовании многопроцессорной обработки нашей общей целью (очевидно) является минимизация общего времени обработки для всех задач.Чтобы достичь этой общей цели, наша техническая цель должна быть , оптимизирующей использование аппаратных ресурсов .

Некоторые важные подцели для достижения технической цели:

  • минимизируют издержки распараллеливания (наиболее известные, но не одни: IPC )
  • высокая загрузка во всех процессорных ядрах
  • ограничение использования памяти для предотвращения чрезмерного подкачки ОС ( trashing )

Сначала задачи требуютчтобы быть достаточно сложным (интенсивным) в вычислительном отношении, чтобы заработать ПО, которое мы должны заплатить за распараллеливание.Актуальность ПО уменьшается с увеличением абсолютного времени вычислений на одно задание.Или, другими словами, чем больше абсолютное время вычислений на одно задание для вашей задачи, тем менее значимым становится потребность в сокращении ПО.Если ваши вычисления займут несколько часов на одну задачу, накладные расходы IPC будут незначительными по сравнению с ними.Основной задачей здесь является предотвращение простоя рабочих процессов после распределения всех задач.Держа все ядра загруженными, значит, мы распараллеливаемся как можно больше.


3.Сценарии распараллеливания

Какие факторы определяют оптимальный аргумент размера фрагмента для таких методов, как multiprocessing.Pool.map ()

Основным вопросом является то, сколько времени может вычисляться варьируются по нашим разным задачам.Чтобы назвать это, выбор оптимального размера фрагмента определяется ...

Коэффициент вариации ( CV ) для времени вычисления на одно задание.

Два экстремальных сценария в масштабе, следующие из степени этого отклонения:

  1. Для всех заданий требуется одинаковое время вычисления.
  2. Задание может занять несколько секунд или дней.

Для лучшей запоминаемости я буду ссылаться на эти сценарии как:

  1. Плотный сценарий
  2. Широкий сценарий


Плотный сценарий

В Плотный сценарий это будетЖелательно распределить все задачи одновременно, чтобы свести к минимуму необходимые IPC и переключение контекста.Это означает, что мы хотим создать только столько кусков, сколько рабочих процессов существует.Как уже говорилось выше, вес PO увеличивается с сокращением времени вычислений на одну задачу.

Для максимальной пропускной способности мы также хотим, чтобы все рабочие процессы были заняты до тех пор, пока не будут выполнены все задачи (без рабочих на холостом ходу).Для этой цели распределенные чанки должны быть одинакового размера или близки к.


Широкий сценарий

Основным примером для Wide Scenario будет проблема оптимизации, когда результаты либо быстро сходятся, либо вычисление может занять часы, если не дни.Обычно неясно, какую смесь «легких задач» и «тяжелых задач» будет содержать задание в таком случае, поэтому не рекомендуется распределять слишком много задач одновременно в пакете задач.Распределение меньшего количества задач одновременно, чем это возможно, означает увеличение гибкости планирования.Это необходимо здесь для достижения нашей подцели высокой эффективности использования всех ядер.

Если Pool методы, по умолчанию, будут полностью оптимизированы для плотного сценария, они будут все чаще создавать субоптимальные временные характеристики для каждой проблемы.расположен ближе к широкому сценарию.


4.Риски Chunksize> 1

Рассмотрим этот упрощенный пример псевдокода с Wide Scenario -записью, который мы хотим передать в метод пула:

good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]

Вместо реальных значений мы делаем вид, что видим необходимое время вычислений в секундах, для простоты только 1 минута или 1 день.Мы предполагаем, что в пуле четыре рабочих процесса (на четырех ядрах), а chunksize имеет значение 2.Поскольку порядок будет соблюден, куски, отправляемые рабочим, будут следующими:

[(60, 60), (86400, 60), (86400, 60), (60, 84600)]

Поскольку у нас достаточно рабочих и время вычислений достаточно велико, мы можем сказать, что каждый рабочий процесс получитПрежде всего, над тем, чтобы работать над этим.(Это не должно иметь место для быстрого выполнения задач).Кроме того, можно сказать, что вся обработка займет около 86400 + 60 секунд, поскольку это наибольшее общее время вычислений для чанка в этом искусственном сценарии, и мы распределяем чанки только один раз.

Теперь рассмотрим эту итерацию, которая имееттолько один элемент меняет свою позицию по сравнению с предыдущим итерируемым:

bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]

... и соответствующими кусками:

[(60, 60), (86400, 86400), (60, 60), (60, 84600)]

Просто неудача с сортировкой нашего итерируемого почти удвоилась (86400 + 86400) наше общее время обработки!Рабочий, получивший порочный (86400, 86400) -часть, блокирует вторую тяжелую задачу в своей задаче из-за того, что ее не распределили одному из рабочих на холостом ходу, уже покончившим с (60, 60) -частями.Мы, очевидно, не рискнули бы таким неприятным исходом, если бы установили chunksize=1.

Это риск больших кусков.С более высокими размерами блоков мы торгуем гибкостью планирования за меньшие накладные расходы, и в случаях, как указано выше, это плохая сделка.

Как мы увидим в главе 6.Количественная оценка эффективности алгоритма , большие размеры также могут привести к неоптимальным результатам для плотных сценариев .


5.Алгоритм пула Chunksize-Algorithm

Ниже вы найдете слегка измененную версию алгоритма внутри исходного кода.Как видите, я обрезал нижнюю часть и обернул ее в функцию для внешнего вычисления аргумента chunksize.Я также заменил 4 параметром factor и передал вызовы len() на внешний подряд.

# mp_utils.py

def calc_chunksize(n_workers, len_iterable, factor=4):
    """Calculate chunksize argument for Pool-methods.

    Resembles source-code within `multiprocessing.pool.Pool._map_async`.
    """
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    return chunksize

Чтобы убедиться, что мы все на одной странице, вот что делает divmod:

divmod(x, y) - встроенная функция, которая возвращает (x//y, x%y).x // y - это деление по полу, которое возвращает округленное вниз число от x / y, тогда как x % y - это операция по модулю, возвращающая остаток от x / y.Следовательно, например, divmod(10, 3) возвращает (3, 1).

Теперь, когда вы посмотрите на chunksize, extra = divmod(len_iterable, n_workers * 4), вы заметите n_workers здесь есть делитель y в x / y и умножение на 4, бездальнейшая корректировка через if extra: chunksize +=1 позже приводит к начальному размеру , по крайней мере, , в четыре раза меньшему (для len_iterable >= n_workers * 4), чем было бы в противном случае.

Для просмотра эффекта умножения на4 для промежуточного результата размера фрагмента рассмотрим эту функцию:

def compare_chunksizes(len_iterable, n_workers=4):
    """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize
    for Pool's complete algorithm. Return chunksizes and the real factors by
    which naive chunksizes are bigger.
    """
    cs_naive = len_iterable // n_workers or 1  # naive approach
    cs_pool1 = len_iterable // (n_workers * 4) or 1  # incomplete pool algo.
    cs_pool2 = calc_chunksize(n_workers, len_iterable)

    real_factor_pool1 = cs_naive / cs_pool1
    real_factor_pool2 = cs_naive / cs_pool2

    return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2

Приведенная выше функция вычисляет наивный размер фрагмента (cs_naive) и размер фрагмента первого шага алгоритма размера фрагмента пула (cs_pool1), а также размер фрагмента для полного алгоритма пула (cs_pool2).Далее он вычисляет действительные факторы rf_pool1 = cs_naive / cs_pool1 и rf_pool2 = cs_naive / cs_pool2, которые говорят нам, во сколько раз наивно рассчитанные размеры фрагментов больше, чем внутренние версии пула.

Ниже вы видите две фигуры, созданные с помощью этой функции.Левый рисунок просто показывает размеры фрагментов для n_workers=4 вплоть до итерируемой длины 500.На правом рисунке показаны значения для rf_pool1.Для итерируемой длины 16 реальный коэффициент становится >=4 (для len_iterable >= n_workers * 4), а его максимальное значение составляет 7 для итерируемой длины 28-31.Это значительное отклонение от исходного фактора 4, к которому сходится алгоритм для более длинных итераций.'Longer' здесь является относительным и зависит от числа указанных рабочих.

figure1

Помните, что для размера cs_pool1 по-прежнему отсутствует настройка extra состаток от divmod содержится в cs_pool2 из полного алгоритма.

Алгоритм продолжается с:

if extra:
    chunksize += 1

Теперь в случаях, когда было , это aОстальное (extra от операции divmod), увеличение размера фрагмента на 1, очевидно, не может сработать для каждой задачи.В конце концов, если бы это было так, не было бы остатка для начала.

Как вы можете видеть на рисунках ниже, " дополнительная обработка " приводит к тому, что реальный коэффициент для rf_pool2 теперь сходится к 4 от ниже 4, и отклонение несколько плавнее.Стандартное отклонение для n_workers=4 и len_iterable=500 падает с 0.5233 для rf_pool1 до 0.4115 для rf_pool2.

figure2

В конце концовувеличение chunksize на 1 приводит к тому, что последнее переданное задание имеет размер len_iterable % chunksize or chunksize.

Чем интереснее и как мы увидим позже, тем более значимым будет эффект дополнительная обработка однако может наблюдаться для числа генерируемых кусков (n_chunks).Для достаточно длинных итераций завершенный алгоритм пула chunksize (n_pool2 на рисунке ниже) стабилизирует количество кусков на n_chunks == n_workers * 4.Напротив, наивный алгоритм (после первоначальной отрыжки) продолжает чередоваться между n_chunks == n_workers и n_chunks == n_workers + 1 по мере увеличения длины повторяемого элемента.

figure3

Ниже вы найдете две улучшенные информационные функции для пула и простой алгоритм chunksize.Вывод этих функций будет необходим в следующей главе.

# mp_utils.py

from collections import namedtuple


Chunkinfo = namedtuple(
    'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks',
                  'chunksize', 'last_chunk']
)

def calc_chunksize_info(n_workers, len_iterable, factor=4):
    """Calculate chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    # `+ (len_iterable % chunksize > 0)` exploits that `True == 1`
    n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
    # exploit `0 == False`
    last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

Не смущайтесь, вероятно, неожиданному взгляду calc_naive_chunksize_info.extra из divmod не используется для вычисления размера фрагмента.

def calc_naive_chunksize_info(n_workers, len_iterable):
    """Calculate naive chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers)
    if chunksize == 0:
        chunksize = 1
        n_chunks = extra
        last_chunk = chunksize
    else:
        n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
        last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

6.Количественная оценка эффективности алгоритма

Теперь, после того, как мы увидели, как выходные данные алгоритма chunksize Pool выглядят иначе, чем выходные данные наивного алгоритма ...

  • Как определить, действительно ли подход Пула улучшает что-то?
  • И что именно может это что-то быть?

Как показано в предыдущей главе, для более длинных итераций (большее количество задач) алгоритм пула chunksize-алгоритм приблизительно делит итерируемое на четыре раза больше фрагментов, чемнаивный метод.Меньшие куски означают больше задач, а больше задач означают больше Накладные расходы на параллелизацию (PO) , стоимость, которую необходимо сопоставить с преимуществом повышенной гибкости планирования (вспомните "Риски размера кусков> 1" ).

По довольно очевидным причинам основной алгоритм пула chunksize не может сравниться с нами по гибкости планирования и PO .Издержки IPC зависят от операционной системы, аппаратного обеспечения и размера данных.Алгоритм не может знать, на каком оборудовании мы запускаем наш код, и не имеет понятия, сколько времени займет выполнение задачи.Это эвристическое обеспечение основных функций для всех возможных сценариев.Это означает, что он не может быть оптимизирован для какого-либо конкретного сценария.Как упоминалось ранее, PO также становится все менее и менее важным с увеличением времени вычислений на одно задание (отрицательная корреляция).

Когда вы вспоминаете Цели распараллеливания из главы 2одной из ключевых точек было:

  • высокая загрузка всех процессорных ядер

Ранее упомянутое что-то , алгоритм пула chunksize-алгоритм может попытаться улучшить - это минимизация бездействующих рабочих процессов , соответственно использование процессорных ядер .

Повторяющийся вопрос о SO в отношении multiprocessing.Pool спрашивают люди, интересующиеся неиспользуемыми ядрами / простоями рабочих процессов в ситуациях, когда можно ожидать, что все рабочие процессы заняты.Хотя на это может быть много причин, простаивающие рабочие процессы ближе к концу вычислений - это наблюдение, которое мы часто можем сделать, даже с плотными сценариями (равное время вычислений на одно задание) в тех случаях, когда число работников составляетне делитель количества кусков (n_chunks % n_workers > 0).

Вопрос теперь таков:

Как мы можем практически перевести наше понимание размеров кусков во что-то, что позволяет нам объяснить наблюдаемое использование работником или даже сравнить эффективность различных алгоритмов вчто касается?


6.1 Модели

Для более глубокого понимания здесь нам нужна форма абстракции параллельных вычислений, которая упрощает слишком сложную реальность до управляемой степенисложность, сохраняя значение в определенных границах.Такая абстракция называется модель .Реализация такой « модели распараллеливания» (PM) генерирует метаданные (временные метки), отображаемые рабочим, как реальные вычисления, если бы данные собирались.Сгенерированные моделью метаданные позволяют прогнозировать метрики параллельных вычислений при определенных ограничениях.

figure4

Одна из двух подмоделей в рамках определенного здесь PM - это модель распределения (DM) . DM объясняет, как атомные единицы работы (задачи) распределяются по параллельным рабочим и времени , когда нет других факторов, кроме соответствующего алгоритма chunksize, количества рабочих, входных данных.повторяемость (количество задач) и продолжительность их вычисления.Это означает, что любая форма издержек не включена.

Для получения полной PM , DM расширяется с издержкамиМодель (OM) , представляющая различные формы издержки распараллеливания (PO) .Такая модель должна быть откалибрована для каждого узла индивидуально (аппаратные, ОС-зависимости).Сколько форм служебных данных представлено в OM , оставлено открытым, и поэтому может существовать несколько OM с различной степенью сложности.Какой уровень точности необходим для реализации OM , определяется общим весом PO для конкретного вычисления.Более короткие задачи приводят к увеличению веса PO , что, в свою очередь, требует более точного OM , если мы пытались предсказать Эффективность распараллеливания (PE).


6.2 Параллельное расписание (PS)

Параллельное расписание - это двумерное представление параллельных вычислений, где ось X представляет время, а ось Y представляет пул параллельных рабочих.Число рабочих и общее время вычислений отмечают протяженность прямоугольника, в котором нарисованы меньшие прямоугольники. Эти меньшие прямоугольники представляют атомные единицы работы (задачи).

Ниже вы видите визуализацию PS , построенный с использованием данных DM алгоритма размера пула для плотного сценария .

figure5

  • Ось X разделена на равные единицы времени, где каждая единица обозначает время вычисления, необходимое для задачи.
  • Ось Y делится на количество рабочих процессов, которые использует пул.
  • В данном случае таскел отображается как наименьший прямоугольник голубого цвета, помещенный во временную шкалу (график)анонимного рабочего процесса.
  • Задача - это одна или несколько задач на временной шкале рабочего, непрерывно выделенных одним и тем же оттенком.
  • Единицы времени холостого хода представлены с помощью красных плиток.
  • Параллельное расписание разбито на секции.Последний раздел является хвостовым.

Названия составных частей можно увидеть на рисунке ниже.

figure6

В комплекте PM , включая OM , Холостой ход не ограничен хвостом, но также включает пространство между задачами и даже между заданиями.


6.3 Эффективность

Примечание:

С более ранних версий этого ответа "Эффективность распараллеливания (PE)" былапереименован в «Эффективность распределения (DE)». PE теперь относится к эффективности, включающей накладные расходы.

Представленные выше модели позволяют количественно оценить коэффициент использования работника.Мы можем различить:

  • Эффективность распределения (DE) - рассчитывается с помощью DM (или упрощенного метода для плотного сценария ).
  • Эффективность распараллеливания (PE) - либо рассчитывается с помощью калиброванного PM (прогноз), либо рассчитывается на основе метаданных реальных вычислений.

Важно отметить, что вычисленные коэффициенты полезного действия не автоматически коррелируют с более быстрым общим вычислением для данной проблемы распараллеливания.Использование работника в этом контексте различает только работника, у которого есть начальная, но незаконченная задача, и работника, у которого нет такой «открытой» задачи.Это означает, что возможный холостой ход в течение промежутка времени для зарегистрированной задачи не зарегистрирован.

Все вышеупомянутые эффективности в основном получены путем вычисления коэффициента деления Busy Share / Параллельное расписание .Разница между DE и PE заключается в том, что занятый ресурс занимает меньшую часть общего параллельного расписания для расширенных служебных данных PM .

В этом ответе далее будет обсуждаться только простой метод расчета DE для плотного сценария.Этого достаточно для сравнения различных алгоритмов chunksize, поскольку ...

  1. ... DM является частью PM , которая изменяетсяс различными используемыми алгоритмами chunksize.
  2. ... Плотный сценарий с равными длительностями вычислений на одно задание изображает «стабильное состояние», для которого эти промежутки времени выпадают из уравнения.Любой другой сценарий может привести к случайным результатам, так как порядок задач будет иметь значение.

6.3.1 Абсолютная эффективность распределения (ADE)

Эту базовую эффективность можно рассчитатьв общем случае путем деления занятой доли на весь потенциал параллельного расписания :

Абсолютная эффективность распределения (ADE) = Занятое распределение / Параллельное расписание

Для Плотного сценария , упрощенный код расчета выглядит следующим образом:

# mp_utils.py

def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Absolute Distribution Efficiency (ADE).

    `len_iterable` is not used, but contained to keep a consistent signature
    with `calc_rde`.
    """
    if n_workers == 1:
        return 1

    potential = (
        ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize)
        + (n_chunks % n_workers == 1) * last_chunk
    ) * n_workers

    n_full_chunks = n_chunks - (chunksize > last_chunk)
    taskels_in_regular_chunks = n_full_chunks * chunksize
    real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk
    ade = real / potential

    return ade

Если нет Холостой ход , Занятая доля будет равна до Параллельное расписание , следовательно, мы получаем ADE 100%.В нашей упрощенной модели это сценарий, в котором все доступные процессы будут заняты все время, необходимое для обработки всех задач.Другими словами, вся работа фактически распараллеливается на 100 процентов.

Но почему я продолжаю ссылаться на PE как absolute PE здесь?

Чтобы понять это, мы должны рассмотреть возможный случай для размера фрагмента (cs), который обеспечивает максимальную гибкость планирования (также может быть число горцев. Совпадение?):

___________________________________ ~ ONE ~ ___________________________________

Если у нас, например, будет четыре рабочих процесса и 37 задач, рабочие будут работать на холостом ходу даже с chunksize=1, простопотому что n_workers=4 не является делителем 37. Остальная часть деления 37/4 равна 1. Эту единственную оставшуюся задачу придется обрабатывать одному работнику, а остальные три - на холостом ходу.

Аналогично,все еще будет один работающий на холостом ходу с 39 задачами, как вы можете видеть на рисунке ниже.

figure7

Когда вы сравниваете верхний ПараллПо расписанию для chunksize=1 с приведенной ниже версией для chunksize=3 вы заметите, что верхнее Параллельное расписание меньше, временная шкала на оси X короче.Теперь должно стать очевидным, как большие куски неожиданно также могут привести к увеличению общего времени вычислений, даже для Плотных сценариев .

Но почему бы просто не использоватьдлина оси X для расчетов эффективности?

Поскольку накладные расходы не содержатся в этой модели.Это будет отличаться для обоих кусков, следовательно, ось X не является прямо сопоставимой.Накладные расходы все еще могут привести к увеличению общего времени вычислений, как показано в case 2 на рисунке ниже.

figure8


6.3.2 Относительная эффективность распределения (RDE)

Значение ADE не содержит информации, если возможно распределение задач лучше с размером фрагмента 1. Лучше здесь все еще означает меньшую Холостую долю .

Чтобы получить значение DE , настроенное на максимально возможное значение DE мы должны разделить рассмотренную ADE на ADE , полученную нами для chunksize=1.

Относительная эффективность распределения (RDE) = ADE_cs_x / ADE_cs_1

Вот как это выглядит в коде:

# mp_utils.py

def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Relative Distribution Efficiency (RDE)."""
    ade_cs1 = calc_ade(
        n_workers, len_iterable, n_chunks=len_iterable,
        chunksize=1, last_chunk=1
    )
    ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk)
    rde = ade / ade_cs1

    return rde

RDE Как определено здесь, по сути это рассказ о хвосте Параллельное расписание . RDE зависит от максимально эффективного размера кусочка, содержащегося в хвосте.(Этот хвост может иметь длину оси x chunksize или last_chunk.) Это приводит к тому, что RDE естественным образом сходится к 100% (даже) для всех видов "хвостов", напримерпоказано на рисунке ниже.

figure9

Низкий RDE ...

  • является сильным намеком на потенциал оптимизации.
  • естественно становится менее вероятным для более длинных итераций, потому что относительная хвостовая часть общего Параллельного расписания уменьшается.

найти часть II этого ответа здесь ниже .

0 голосов
/ 21 февраля 2019

Об этом ответе

Этот ответ является частью II принятого ответа выше .


7.Алгоритм наивного размера и пула

Прежде чем углубляться в детали, рассмотрим две картинки ниже.Для диапазона различной длины iterable они показывают, как два сравниваемых алгоритма разделяют переданный iterable (к тому времени это будет последовательность) и как могут быть распределены результирующие задачи.Порядок работников является случайным, и количество распределенных задач на одного работника в действительности может отличаться от этого изображения для легких задач и / или задач в широком сценарии.Как упоминалось ранее, накладные расходы также не включены сюда.Однако для достаточно тяжелых задач в плотном сценарии с пренебрежимо малым размером передаваемых данных реальные вычисления дают очень похожую картину.

cs_4_50

cs_200_250

Asкак показано в главе " 5. Алгоритм размера пула ", при использовании алгоритма размера пула число чанков стабилизируется на n_chunks == n_workers * 4 для достаточно больших итераций, при этом он продолжает переключаться между n_chunks == n_workers и n_chunks == n_workers + 1с наивным подходом.Для наивного алгоритма применяется: Поскольку n_chunks % n_workers == 1 равен True для n_chunks == n_workers + 1, будет создан новый раздел, в котором будет использоваться только один рабочий.

Наивный алгоритм Chunksize-Algorithm:

Вам может показаться, что вы создали задачи с одинаковым количеством рабочих, но это будет справедливо только в тех случаях, когда нет остатка для len_iterable / n_workers.Если является остатком, будет новый раздел с одной задачей для одного работника.В этот момент ваши вычисления больше не будут параллельными.

Ниже вы видите фигуру, аналогичную показанной в главе 5, но показывающую количество секций вместо количества фрагментов.Для полного алгоритма изменения размера пула (n_pool2) n_sections стабилизируется при печально известном жестко закодированном факторе 4.Для наивного алгоритма n_sections будет чередоваться от одного до двух.

figure10

Для алгоритма размера пула стабилизация в n_chunks = n_workers * 4 черезранее упомянутая дополнительная обработка , предотвращает создание нового раздела здесь и сохраняет Idling Share ограниченным одним рабочим для достаточного количества итераций.Не только это, но и алгоритм будет продолжать уменьшать относительный размер Idling Share , что приводит к значению RDE, приближающемуся к 100%.

«Достаточно долго» для n_workers=4это len_iterable=210 например.Для итераций, равных или превышающих это, Idling Share будет ограничен одним рабочим, черта, изначально утраченная из-за умножения 4 в алгоритме chunksize в первую очередь.

figure11

Наивный алгоритм уменьшения размера также сходится к 100%, но он делает это медленнее.Сходящийся эффект зависит исключительно от того, что относительная часть хвоста сжимается для случаев, когда будет две секции.Этот хвост только с одним занятым работником ограничен длиной оси x n_workers - 1, возможным максимальным остатком для len_iterable / n_workers.

Как фактические значения RDE отличаются для наивного и пула chunksize-алгоритма?

Ниже вы найдете две тепловые карты, показывающие значения RDE для всех повторяемых длин до 5000, для всех чисел рабочих от 2 до 100. Цветовая шкала идет от0,5 к 1 (50% -100%).Вы заметите намного больше темных областей (более низкие значения RDE) для наивного алгоритма в левой тепловой карте.В отличие от этого, алгоритм chunksize пула справа рисует гораздо более солнечную картинку.

figure12

Диагональный градиент нижних левых темных углов по сравнению с верхними- правые светлые углы, опять же показывающие зависимость от числа рабочих для того, что можно назвать «длинным итеративным».

Насколько плохо это может быть с каждым алгоритмом?

При использовании алгоритма chunksize пула значение RDE , равное 81,25%, является наименьшим значением для диапазона рабочих и повторяемых длин, указанных выше:

figure13

С наивным алгоритмом chunksize все может стать намного хуже.Самый низкий расчетный RDE здесь составляет 50,72%.В этом случае почти на половину времени вычислений работает только один рабочий!Итак, берегитесь, гордые владельцы Knights Landing .;)

figure14


8.Проверка реальности

В предыдущих главах мы рассматривали упрощенную модель для чисто математической задачи распределения, в которой не учитывались мелкие детали, которые делают многопроцессорность такой сложной темой.Чтобы лучше понять, насколько далеко может помочь только модель распределения (DM) , чтобы объяснить наблюдаемое использование работника в реальности, теперь мы немного рассмотрим параллельные графики, составленные с помощью real вычислений.

Настройка

На всех следующих графиках рассматриваются параллельные выполнения простой фиктивной функции, связанной с процессором, которая вызывается с различными аргументами, поэтому мы можем наблюдать, как нарисованное параллельное расписание изменяется в зависимости отвходные значения.«Работа» внутри этой функции состоит только из итерации по объекту диапазона.Этого уже достаточно, чтобы поддерживать ядро ​​занятым, так как мы передаем огромные числа. Опционально, функция получает несколько уникальных taskel дополнительных data, которые просто возвращаются без изменений.Поскольку каждое задание включает в себя один и тот же объем работы, мы все еще имеем дело с плотным сценарием.

Функция украшена оболочкой, использующей временные метки с разрешением ns (Python 3.7+).Временные метки используются для вычисления временного промежутка задачи и, следовательно, позволяют рисовать эмпирическое параллельное расписание.

@stamp_taskel
def busy_foo(i, it, data=None):
    """Dummy function for CPU-bound work."""
    for _ in range(int(it)):
        pass
    return i, data


def stamp_taskel(func):
    """Decorator for taking timestamps on start and end of decorated
    function execution.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time_ns()
        result = func(*args, **kwargs)
        end_time = time_ns()
        return (current_process().name, (start_time, end_time)), result
    return wrapper

Метод starmap пула также оформлен таким образом, что синхронизируется только сам вызов starmap,«Начало» и «конец» этого вызова определяют минимум и максимум на оси х создаваемого параллельного расписания.

Мы собираемся наблюдать вычисление 40 задач на четырех рабочих процессах на машине с этимиспецификации: Python 3.7.1, Ubuntu 18.04.2, процессор Intel® Core ™ i7-2600K @ 3.40 ГГц × 8

Входные значения, которые будут варьироваться, - это число итераций в цикле for (30 тыс., 30M, 600M) и дополнительно отправляемый размер данных (для каждого taskel, numpy-ndarray: 0 MiB, 50 MiB).

...
N_WORKERS = 4
LEN_ITERABLE = 40
ITERATIONS = 30e3  # 30e6, 600e6
DATA_MiB = 0  # 50

iterable = [
    # extra created data per taskel
    (i, ITERATIONS, np.arange(int(DATA_MiB * 2**20 / 8)))  # taskel args
    for i in range(LEN_ITERABLE)
]


with Pool(N_WORKERS) as pool:
    results = pool.starmap(busy_foo, iterable)

Показанные ниже прогоны были выбраны вручную, чтобы иметь такой же порядок чанков, чтобы выможет лучше определить различия по сравнению с параллельным расписанием из модели распределения, но не забывайте, что порядок, в котором работники получают свою задачу, не является детерминированным.

Прогноз DM

Повторениемодель распределения «предсказывает» параллельное расписание, как мы уже видели в главе 6.2:

figure15

1-й RUN: 30 000 итераций & 0MiB данных на одно задание

figure16

Наш первый запуск здесь очень короткий, задания очень легкие.Весь вызов pool.starmap() занял всего 14,5 мс.Вы заметите, что в отличие от DM , холостой ход не ограничивается хвостовой частью, но также имеет место между задачами и даже между заданиями.Это потому, что наше реальное расписание здесь естественно включает в себя все виды накладных расходов.Холостой ход здесь означает просто все вне задачи.Возможно реальное на холостом ходу во время таскель не захватывается, как уже упоминалось ранее.

Далее вы можете видеть, что не все работники получают свои задачи одновременно.Это связано с тем, что все работники получают питание через общий inqueue, и только один работник может читать из него одновременно.То же самое относится и к outqueue.Это может привести к большим расстройствам, как только вы передаете данные без предельных размеров, как мы увидим позже.

Кроме того, вы можете видеть, что, несмотря на тот факт, что каждое задание содержит одинаковый объем работы, фактический измеренный промежуток времени для задания точно варьируется.Задачам, раздаемым работнику-3 и работнику-4, требуется больше времени, чем обработанным первыми двумя работниками.Я подозреваю, что для этого прогона это связано с тем, что turbo boost больше не доступен в ядрах для worker-3/4 в этот момент, поэтому они выполняли свои задачи с более низкой тактовой частотой.

Все вычисления настолько легки, что аппаратные или операционные факторы хаоса могут резко исказить PS .Вычисление является «листом на ветру», и прогноз DM не имеет большого значения даже для теоретически подходящего сценария.

2-й RUN: 30M итераций и 0 МБ данных на одно задание

figure17

Увеличение числа итераций в цикле for с 30 000 до 30 миллионов приводит к реальному параллельному расписанию, которое близко к идеальному совпадению спредсказанный данными, предоставленными DM , ура!Вычисления на одно задание теперь достаточно тяжелые, чтобы маргинализовать части холостого хода в начале и между ними, оставляя видимой только большую долю холостого хода, которую предсказал DM .

3-й RUN: 30M итераций &Данные 50 МБ на одно задание

figure18

Сохранение 30M итераций, но дополнительно отправка 50 МБ на одно задание назад и вперед искажает изображение снова.Здесь эффект очереди хорошо виден.Worker-4 должен ждать дольше своей второй задачи, чем Worker-1.Теперь представьте себе этот график с 70 работниками!

В случае, если задачи в вычислительном отношении очень легки в вычислительном отношении, но предоставляют значительный объем данных в качестве полезной нагрузки, узкое место в одной общей очереди может предотвратить любые дополнительные преимущества добавления большего числа рабочих вПул, даже если они подкреплены физическими ядрами.В таком случае Worker-1 можно было бы выполнить со своей первой задачей и ожидать новой задачи еще до того, как Worker-40 получил свою первую задачу.

Теперь должно стать очевидным, почему время вычислений в Poolне всегда уменьшается линейно с количеством работников.Отправка относительно больших объемов данных по может привести к сценариям, в которых большую часть времени тратится на ожидание копирования данных в адресное пространство работника, и одновременно может подаваться только один работник.

4-й RUN: 600M итераций и 50 МБ данных на одно задание

figure19

Здесь мы снова отправляем 50 MiB, но увеличиваем количество итерацийот 30 до 600 м, что увеличивает общее время вычислений с 10 до 152 с.Нарисованное параллельное расписание снова близко к идеальному совпадению с прогнозируемым, накладные расходы при копировании данных маргинализируются.


9.Заключение

Обсуждаемое умножение на 4 повышает гибкость планирования, но также усиливает неравномерность распределений Taskel.Без этого умножения доля холостого хода будет ограничена одним рабочим даже для коротких итераций (для DM с плотным сценарием).Алгоритм пула chunksize требует, чтобы итераторы ввода были определенного размера, чтобы восстановить эту черту.

Как можно надеяться, этот ответ показал, что алгоритм пула chunksize приводит к лучшему использованию ядра в среднем по сравнению с наивным подходом,по крайней мере для среднего случая и так как длительные накладные расходы не рассматриваютсяНаивный алгоритм здесь может иметь эффективность распределения (DE) всего ~ 51%, в то время как алгоритм размера пула имеет минимальный уровень ~ 81%. DE однако не включает издержки распараллеливания (PO), такие как IPC.Глава 8 показала, что DE все еще может иметь большую предсказательную силу для плотного сценария с маргинальными накладными расходами.

Несмотря на то, что алгоритм пула chunksize-алгоритма достигает более высокого DE по сравнению с наивным подходом, он не обеспечивает оптимального распределения Taskel для каждого входного созвездия. В то время как простая статическая порцияалгоритм не может оптимизировать (включая издержки) эффективность распараллеливания (PE), нет внутренней причины, по которой он всегда не может обеспечить относительную эффективность распределения (RDE), равную 100%, что означает то же самое DE как и chunksize=1.Простой алгоритм chunksize состоит только из базовой математики и может "нарезать кусок" любым способом.

В отличие от реализации пула алгоритма "равного размера", "четного размера""алгоритм обеспечит RDE 100% для каждой комбинации len_iterable / n_workers.Алгоритм разделения на четные размеры будет немного сложнее реализовать в исходном коде пула, но его можно модулировать поверх существующего алгоритма, просто упаковав задачи извне (я сошлюсь здесь на случай, если я уроню Q / A накак это сделать).

0 голосов
/ 30 декабря 2018

Я думаю, что часть того, что вы упускаете, заключается в том, что ваша наивная оценка предполагает, что каждая единица работы занимает одинаковое количество времени, и в этом случае ваша стратегия будет наилучшей.Но если некоторые задания завершаются раньше, чем другие, некоторые ядра могут простаивать в ожидании завершения медленных заданий.

Таким образом, если разбить фрагменты на 4 раза больше кусков, то, если один блок завершится раньше, это ядро ​​можетзапустить следующий блок (в то время как другие ядра продолжают работать над более медленным блоком).

Я не знаю, почему они выбрали фактор 4 точно, но это был бы компромисс между минимизацией издержек кода карты.(для которого нужны как можно большие порции), а балансировка фрагментов занимает разное количество раз (для этого требуется наименьшая возможная порция).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...