Spark - java.lang.OutOfMemoryError: размер запрошенного массива превышает ограничение виртуальной машины - PullRequest
0 голосов
/ 03 мая 2018

Я пытаюсь выполнить операцию groupBy на фрейме данных в Spark Cloudera (2.1.0) на кластере из 7 узлов с общим объемом оперативной памяти около 512 ГБ. Мой код выглядит следующим образом.

ndf = ndf.repartition(20000)
by_user_df = ndf.groupBy(ndf.name) \
            .agg(collect_list("file_name")) \
            .withColumnRenamed('collect_list(file_name)', 'file_names')


by_user_df = by_user_df.repartition(20000)    
by_user_df.count()

ndf - это фрейм данных, содержащий 2 столбца, идентификатор пользователя и имя файла. Я пытаюсь создать список имен файлов по идентификатору пользователя для передачи в CountVectorizer и кластеризации.

Я получаю следующую ошибку

java.lang.OutOfMemoryError: Requested array size exceeds VM limit
    at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Из того, что я прочитал, это связано с выделением массива, большего, чем то, что виртуальная машина может обрабатывать в непрерывной памяти, или большего, чем системный максимум для размера массива. Многие из рекомендаций состоят в том, чтобы распараллелить больше, разбив на большее количество разделов.

У меня около 6 тысяч пользователей и около 7 тысяч имен файлов. Я заметил, что умирающий исполнитель проводит большую часть своего времени в сборке мусора.

Я пробовал следующее далеко:

  1. переразметить фрейм данных ndf и результирующий фрейм данных. Я пробовал до 60 КБ в аргументах перераспределения для каждого.
  2. Я установил "spark.sql.shuffle.partitions" с шагом до 20000
  3. Я увеличил память исполнителя до 25G
  4. Несмотря на то, что умирающий исполнитель не является драйвером, я также увеличил объем памяти драйвера до 25G.

В качестве обновления к этому вопросу: я понял, что в этом случае я делаю двоичную кластеризацию по данным, поэтому мне действительно нужно только одно из каждого имени файла. Изменение collect_list на collect_set оставило меня с выводом, который мне был нужен, и, по-видимому, был достаточно мал, чтобы работать в рамках заданных параметров. Я все еще собираюсь попытаться исправить первоначальный случай.

1 Ответ

0 голосов
/ 03 мая 2018

Прежде всего, я не очень понимаю, зачем вам такая высокая ценность разделов. Я не знаю, сколько ядер у вас на каждом из 7 рабочих, но я сомневаюсь, что вам нужно более 200 разделов (чрезвычайно большое количество используемых вами разделов может фактически объяснить, почему ваши рабочие умирают из-за сборки мусора)

Ваша проблема выглядит как проблема с памятью в определениях JVM, поэтому я не вижу причин для увеличения памяти драйвера или рабочей памяти.

Я думаю, что вам нужно установить Xss или Xmx или MaxPermSize, как указано здесь: Как исправить ошибку "Запрашиваемый размер массива превышает ограничение виртуальной машины" в Java?

Для этого вам нужно использовать --conf spark.driver.extraJavaOptions и --conf spark.executor.extraJavaOptions при запуске spark.

Например:

--conf spark.driver.extraJavaOptions="-Xss10m -XX:MaxPermSize=512M " --conf spark.executor.extraJavaOptions="-Xss10m -XX:MaxPermSize=128M "
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...