Не существует единой оптимальной конфигурации для всех сценариев использования, но я дам вам всю эвристику, которую я собрал по своему опыту работы со Spark.
Больше разделов, чем у ядер
Во-первых,давайте констатируем очевидное.Вам нужно иметь больше разделов (= задач на данном этапе), чем у вас ядер, иначе некоторые ядра будут сидеть без дела и ничего не делать.Есть ли исключения из этого правила?Да:
- Вы также можете запускать несколько заданий параллельно.Допустим, у вас есть 1000 небольших наборов данных, и вам нужно применить некоторые преобразования к каждому из них независимо от других.Вы, вероятно, не хотите разбивать каждый набор данных на файлы размером 128 тыс., Однако вы можете запускать несколько заданий по 128 разделов параллельно, чтобы максимально увеличить количество ядер.Обратите внимание, что я знаю, как это сделать только за один шаг или в настраиваемом кластере YARN, установив
spark.scheduler.mode=FAIR
.Я никогда не пытался представить параллельные шаги EMR, и я не знаю, возможно ли это, это не обычная концепция YARN (но, опять же, вы можете сделать это в течение одного шага, если хотите). - вашзадачи сами распараллелены.Это абсолютно не обычный случай использования, и я не рекомендую вообще, но мне пришлось распараллелить некоторый классификационный код MXNet на Spark.Код Java создает процесс Python, который использует MXNet для прогнозирования, а затем возвращает результат в Java.Поскольку MXNet внутренне параллелен и довольно хорошо использует ядра, я обнаружил, что пропускная способность намного выше, поскольку у него максимально возможное количество машин (таким образом, используется наименьшее количество возможных экземпляров) и только два исполнителя (контейнера) на машину.Каждый исполнитель создавал отдельный процесс MXNet для обслуживания 4 задач Spark (разделов), и этого было достаточно, чтобы максимально использовать мой процессор.Не ограничивая количество процессов MXNet, центральный процессор постоянно подключался на 100% и тратил огромное количество времени на переключение контекста.
Хороший объем данных на раздел
Ведет небольшие разделычтобы замедлить работу, потому что между драйвером и ведомыми существует определенный объем связи, и это занимает много времени, если у вас есть 100 000 небольших задач.Если ваши задачи выполняются менее чем за 1 секунду, ваши разделы определенно слишком малы.
И наоборот, большие разделы представляют опасность для памяти, особенно во время перемешивания.Перемешивания очень требовательны к памяти и заставят вас много собирать мусор.Если ваши разделы слишком велики, вы увеличите риск нехватки памяти или, в лучшем случае, потратите 50% своего времени в GC.2 ГБ в сериализованной форме является абсолютным пределом для размера раздела, поскольку Spark использует утилиту Java IO, которая поддерживается байтовым массивом, который может содержать только элементы 2^31 - 1
(размер int
).
Как правило, я рекомендую около 25MB-70MB в сжатом виде, если вы делаете случайные записи (в основном речь идет о JSON и текстовых данных здесь).
Трансляции
Если вам нужнодля трансляции некоторых объектов всем вашим исполнителям (например, фильтр Блума для уменьшения размера набора данных перед его перетасовкой), количество требуемых контейнеров будет зависеть от объема памяти, который вы хотите использовать на каждой машине длядержите ваши трансляции.Действительно, объект будет транслироваться один раз для каждого исполнителя, поэтому объем передаваемых данных на машину равен object_size * num_executors / num_ec2_instances
в предположении однородного кластера.Стоимость сети также увеличивается с увеличением количества контейнеров, так как объект должен быть передан несколько раз на каждый экземпляр EC2.
Однако, у меня был случай, когда мой вещаемый объект представлял собой логистическую модель, в которой использовались некоторые внутренниеизменчивое состояние во время классификации.Это означало, что метод предсказания был синхронизирован и что все потоки в моем контейнере боролись за доступ к этой блокировке.Увеличив количество контейнеров (и, следовательно, затраты на память и сеть для моей трансляции), я ускорил работу в 4 раза.
Резюме
Количество разделов скорее зависит от размера данных, чем от числа доступных ядер.Если ваши данные не требуют более 200 разделов, то просто не берите кластер размером более 200 ядер, вы, вероятно, не получите сколько-нибудь значительного ускорения от увеличения количества разделов и ядер, если разделы уже имеют достаточно хороший размер.
Пока ваши данные хорошего размера и ваши разделы сбалансированы, единственная оставшаяся эвристика:
- использует как минимум столько же ядер, сколько разделы
- runпараллельное выполнение нескольких заданий (если вы можете и), если вы хотите увеличить пропускную способность, но ваши разделы уже имеют большой размер
- избегайте выполнения многопоточного кода внутри ваших задач, но в тех редких случаях, для которых вам нужноэто, рассмотрите возможность иметь меньше разделов, чем ядер, в противоположность тому, что мы говорили ранее.
- чем больше контейнеров у вас, тем дороже становится широковещание (больше сетевой активности во время широковещания и больше использования памяти до его уничтожения).Если ваш транслируемый объект полностью неизменен, попробуйте иметь как можно меньше контейнеров.Если ваш контейнер имеет некоторое внутреннее состояние и требует блокировки, слишком большое количество потоков в контейнере может увеличить конфликт и замедлить работу.