Перекрестное объединение двух больших наборов данных в Spark - PullRequest
0 голосов
/ 12 января 2019

У меня есть 2 больших набора данных. Первый набор данных содержит около 130 миллионов записей.
Второй набор данных содержит около 40000 записей. Данные извлекаются из таблиц MySQL.

Мне нужно сделать кросс-соединение, но я получаю

java.sql.SQLException: GC overhead limit exceeded

Каков наилучший оптимальный метод для этого в Scala?

Ниже приведен фрагмент моего кода:

val df1 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table1,"id",100,100000,40, MySqlConnection.getConnectionProperties))
val df2 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table2, MySqlConnection.getConnectionProperties))
val df2Cache = df2.repartition(40).cache()
val crossProduct = df1.join(df2Cache)

df1 - больший набор данных, а df2 - меньший.

Ответы [ 3 ]

0 голосов
/ 14 января 2019

Согласитесь с Владимиром, подумайте о добавлении дополнительных очков.

см. MapStatus установить spark.sql.shuffle.partitions в 2001 ( старый подход ) (по умолчанию 200).

новый подход (spark.shuffle.minNumPartitionsToHighlyCompress), как Владимир сказал в ответе.

Почему это изменение? : MapStatus имеет 2000 жестко запрограммированных SPARK-24519

будет применяться другой алгоритм для обработки

def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }

HighlyCompressedMapStatus:

Реализация MapStatus, которая хранит точный размер огромного блоки, которые больше, чем spark.shuffle.accurateBlockThreshold. Он хранит средний размер других непустых блоков, плюс растровое изображение для отслеживания, какие блоки пусты.

spark. Это помогает предотвратить ООМ, избегая недооценки размера блока случайного воспроизведения при извлечении блоков случайного воспроизведения.


CompressedMapStatus:

Реализация MapStatus, которая отслеживает размер каждого блока. Размер для каждого блока представлен один байт.

Также установите на spark-submit

--conf spark.yarn.executor.memoryOverhead=<10% of executor memory>  -- conf spark.shuffle.compress=true --conf spark.shuffle.spill.compress=true 

в обоих случаях Сжатие будет использовать spark.io.compression.codec

Заключение : большие задачи должны использовать HighlyCompressedMapStatus, и накладные расходы памяти исполнителя могут составлять 10 процентов от объема памяти вашего исполнителя.

Далее, взгляните на настройка памяти памяти

0 голосов
/ 14 января 2019

Увеличьте SPARK_EXECUTOR_MEMORY до более высокого значения и переделите на большее количество разделов

0 голосов
/ 12 января 2019

130M * 40K = 52 триллиона записей - это 52 терабайта необходимой памяти для хранения этих данных, и это если мы предположим, что каждая запись составляет 1 байт, что, безусловно, не соответствует действительности. Если он составляет 64 байта (что, я думаю, тоже очень консервативная оценка), вам понадобится 3,32 петабайта (!) Памяти только для хранения данных. Это очень большое количество, поэтому если у вас нет очень большого кластера и очень быстрой сети внутри этого кластера, вы можете переосмыслить свой алгоритм, чтобы он работал.

При этом, когда вы делаете join из двух наборов данных / фреймов данных SQL, число разделов, которые Spark будет использовать для хранения результата объединения, контролируется свойством spark.sql.shuffle.partitions (см. здесь ). Возможно, вы захотите установить для него очень большое число и установить для числа исполнителей самое большое из возможных. Тогда вы сможете запустить обработку до конца.

Кроме того, вы можете обратиться к опции spark.shuffle.minNumPartitionsToHighlyCompress; если вы установите его меньше, чем количество случайных разделов, вы можете получить еще один прирост памяти. Обратите внимание, что эта опция была жестко заданной константой, установленной на 2000 до последней версии Spark, поэтому в зависимости от вашей среды вам просто нужно будет установить spark.sql.shuffle.partitions на значение больше 2000, чтобы использовать его.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...