Может ли снижение параллелизма привести к отсутствию случайного разлива? - PullRequest
0 голосов
/ 07 ноября 2019

Рассмотрим пример:

У меня есть кластер с 5 узлами, и каждый узел имеет 64 ядра с 244 ГБ памяти.

Я решил запустить по 3 исполнителя на каждом узле и установить для ядер-исполнителей 21 и память для 80 ГБ, чтобы каждый исполнитель мог выполнять 21 задачу параллельно. Теперь рассмотрим 315 (63 * 5) разделов данных, из которых 314 разделов имеют размер 3 ГБ, но один из них - 30 ГБ (из-за перекоса данных).

Все исполнители, получившие разделы 3 ГБ, имеют 63 ГБ (21 * 3 =, поскольку каждый исполнитель может выполнять 21 задачу параллельно, а каждая задача занимает 3 ГБ пространства памяти).

Но одному исполнителю, получившему раздел 30 ГБ, потребуется 90 ГБ (20 * 3 + 30) памяти. Так будет ли этот исполнитель сначала выполнять 20 задач по 3 ГБ, а затем загружать задачу по 30 ГБ или он просто попытается загрузить 21 задачу и обнаружит, что для одной задачи он должен пролиться на диск? Если я установлю для executor-core только 15, то исполнителю, который получит раздел 30 ГБ, потребуется только 14 * 3 + 30 = 72 ГБ, и, следовательно, он не попадет на диск.

Значит, в этом случае уменьшенный параллелизм не приведет к разливу в случайном порядке?

1 Ответ

0 голосов
/ 07 ноября 2019

@ Venkat Dabri,

Не могли бы вы отформатировать вопросы с соответствующим возвратом каретки / пробелами?

Вот несколько указателей

Spark (Shuffle) MapЭтап ==> размер каждого раздела зависит от размера блока файловой системы. Например, если данные считываются из HDFS, каждый раздел будет пытаться располагать данные как можно ближе к 128MB , поэтому для входных данных число разделов = floor (количество файлов * blockize / 128 (на самом деле 122.07 используется как Mebibyte))

Теперь описываемый вами сценарий предназначен для Перемешанные данные в Редукторе ( Стадия результата )

Здесь блоки Обрабатываемые задачами редуктора называются Перемешанные блоки , и по умолчанию Spark (для API SQL / Core) запускает 200 задач редуктора

Теперь важно помнить, что Spark может удерживатьсяМакс. 2 ГБ , поэтому, если у вас слишком несколько разделов и один из них делает дистанционное извлечение блока случайного воспроизведения> 2 ГБ, вы увидите ошибку типа Size exceeds Integer.MAX_VALUE

Чтобы смягчить это, в рамках предела по умолчанию Spark использует множество оптимизаций (сжатие / вольфрам-сортировка-перемешивание и т. Д.), Но как разработчик мы можем попытаться перераспределить с искажением данных и настроить параллелизм по умолчанию

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...