Я пытаюсь выполнить операцию groupBy на фрейме данных в Spark Cloudera (2.1.0) на кластере из 7 узлов с общим объемом оперативной памяти около 512 ГБ. Мой код выглядит следующим образом.
ndf = ndf.repartition(20000)
by_user_df = ndf.groupBy(ndf.name) \
.agg(collect_list("file_name")) \
.withColumnRenamed('collect_list(file_name)', 'file_names')
by_user_df = by_user_df.repartition(20000)
by_user_df.count()
ndf - это фрейм данных, содержащий 2 столбца, идентификатор пользователя и имя файла. Я пытаюсь создать список имен файлов по идентификатору пользователя для передачи в CountVectorizer и кластеризации.
Я получаю следующую ошибку
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Из того, что я прочитал, это связано с выделением массива, большего, чем то, что виртуальная машина может обрабатывать в непрерывной памяти, или большего, чем системный максимум для размера массива. Многие из рекомендаций состоят в том, чтобы распараллелить больше, разбив на большее количество разделов.
У меня около 6 тысяч пользователей и около 7 тысяч имен файлов. Я заметил, что умирающий исполнитель проводит большую часть своего времени в сборке мусора.
Я пробовал следующее далеко:
- переразметить фрейм данных ndf и результирующий фрейм данных. Я пробовал до 60 КБ в аргументах перераспределения для каждого.
- Я установил "spark.sql.shuffle.partitions" с шагом до 20000
- Я увеличил память исполнителя до 25G
- Несмотря на то, что умирающий исполнитель не является драйвером, я также увеличил объем памяти драйвера до 25G.
В качестве обновления к этому вопросу: я понял, что в этом случае я делаю двоичную кластеризацию по данным, поэтому мне действительно нужно только одно из каждого имени файла. Изменение collect_list
на collect_set
оставило меня с выводом, который мне был нужен, и, по-видимому, был достаточно мал, чтобы работать в рамках заданных параметров. Я все еще собираюсь попытаться исправить первоначальный случай.