Невозможно загрузить набор данных 25 ГБ в локальном режиме PySpark с 56 ГБ свободной памяти - PullRequest
3 голосов
/ 01 июля 2019

У меня проблемы с загрузкой и обработкой набора данных Parquet 25 ГБ (из сообщений stackoverflow.com) на одной мощной машине в локальном режиме с 12 ядрами / 64 ГБ ОЗУ.

У меня больше памяти на моей машине, которая свободна и выделена на pyspark, чем размер набора данных Parquet (не говоря уже о двух столбцах набора данных), и все же я не могу выполнить какие-либо операции с DataFrame, как только я загрузить его. Это сбивает с толку, и я не могу понять, что делать.

В частности, у меня есть набор данных Parquet, который составляет 25 ГБ:

$ du -sh data/stackoverflow/parquet/Posts.df.parquet

25G data/stackoverflow/parquet/Posts.df.parquet

У меня есть машина с 56 ГБ свободной оперативной памяти:

$ free -h

              total        used        free      shared  buff/cache   
available
Mem:            62G        4.7G         56G         23M        1.7G         
57G
Swap:           63G          0B         63G

Я настроил PySpark для использования 50 ГБ ОЗУ (попытался настроить maxResultSize безрезультатно).

Моя конфигурация выглядит так:

$ cat ~/spark/conf/spark-defaults.conf

spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec
spark.driver.memory 50g
spark.jars ...
spark.executor.cores 12
spark.driver.maxResultSize 20g

Моя среда выглядит так:

$ cat ~/spark/conf/spark-env.sh

PYSPARK_PYTHON=python3
PYSPARK_DRIVER_PYTHON=python3
SPARK_WORKER_DIR=/nvm/spark/work
SPARK_LOCAL_DIRS=/nvm/spark/local
SPARK_WORKER_MEMORY=50g
SPARK_WORKER_CORES=12

Я загружаю данные так:

$ pyspark

>>> posts = spark.read.parquet('data/stackoverflow/parquet/Posts.df.parquet')

Он загружается нормально, но любая операция, в том числе если я сначала запускаю limit(10) в кадре данных, приводит к ошибке нехватки пространства.

>>> posts.limit(10)\
    .select('_ParentId','_Body')\
    .filter(posts._ParentId == 9915705)\
    .show()

[Stage 1:>                                                       (0 + 12) / 195]19/06/30 17:26:13 ERROR Executor: Exception in task 7.0 in stage 1.0 (TID 8)
java.lang.OutOfMemoryError: Java heap space
19/06/30 17:26:13 ERROR Executor: Exception in task 3.0 in stage 1.0 (TID 4)
java.lang.OutOfMemoryError: Java heap space
19/06/30 17:26:13 ERROR Executor: Exception in task 5.0 in stage 1.0 (TID 6)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.parquet.bytes.HeapByteBufferAllocator.allocate(HeapByteBufferAllocator.java:32)
at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1166)
at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/06/30 17:26:13 ERROR Executor: Exception in task 10.0 in stage 1.0 (TID 11)
java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.parquet.bytes.HeapByteBufferAllocator.allocate(HeapByteBufferAllocator.java:32)
    at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1166)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/06/30 17:26:13 ERROR Executor: Exception in task 6.0 in stage 1.0 (TID 7)
java.lang.OutOfMemoryError: Java heap space
19/06/30 17:26:13 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 7,5,main]
java.lang.OutOfMemoryError: Java heap space
19/06/30 17:26:13 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 11,5,main]
java.lang.OutOfMemoryError: Java heap space
...

Будет запущено следующее, предполагая, что проблема заключается в поле _Body (очевидно, самое большое):

>>> posts.limit(10).select('_Id').show()

+---+
|_Id|
+---+
|  4|
|  6|
|  7|
|  9|
| 11|
| 12|
| 13|
| 14|
| 16|
| 17|
+---+

Что мне делать? Я мог бы использовать EMR, но я хотел бы иметь возможность загружать этот набор данных локально, и это вполне разумно сделать в такой ситуации.

Ответы [ 2 ]

1 голос
/ 01 июля 2019

Фракция памяти по умолчанию для хранения и вычисления Spark - 0.6. Под вашим конфигом это будет 0.6 * 50GB = 30GB. Но представление данных в памяти может занимать больше места, чем сериализованная версия диска.

Пожалуйста, проверьте раздел Управление памятью , чтобы получить более подробную информацию.

0 голосов
/ 01 июля 2019

Вам нужно будет установить конфигурацию искровой памяти при запуске команды pyspark:

pyspark --conf spark.driver.memory=50g --conf spark.executor.pyspark.memory=50g

Проверьте это doc для конфигурации, которую нужно установить.

Возможно, вытакже необходимо выяснить, сколько исполнителей вам нужно, исходя из вашего оборудования.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...