Преобразование фрейма данных pyspark в фрейм данных pandas - PullRequest
0 голосов
/ 25 февраля 2019

У меня есть фрейм данных pyspark, где его размерность (28002528,21), и я попытался преобразовать его в фрейм данных pandas, используя следующую строку кода:

pd_df=spark_df.toPandas()

Я получил эту ошибку:

первая часть

Py4JJavaError: An error occurred while calling o170.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 39.0 failed 1 times, most recent failure: Lost task 3.0 in stage 39.0 (TID 89, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
    at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:552)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:256)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)


Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
        ...
        ...

Caused by: java.lang.OutOfMemoryError: Java heap space
        ...
        ...    

вторая часть

Exception happened during processing of request from ('127.0.0.1', 56842)
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:56657)
Traceback (most recent call last):
        ...
        ...    
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:
        ...
        ...

, и я также попытался взять образец исходного кадра данных pyspark

smaple_pd_df=spark_df.sample(0.05).toPandas()

Я получил сообщение об ошибке, похоже, что первая часть только предыдущей ошибки

Ответы [ 2 ]

0 голосов
/ 05 апреля 2019

Что делает toPandas(), так это собирает весь фрейм данных в один узел (как объяснено в ответе @ ulmefors).

Точнее, он собирает его в драйвер.Конкретная опция, которую вы должны отрегулировать, - spark.driver.memory, увеличьте ее соответствующим образом.

В противном случае, если вы планируете делать дальнейшие преобразования на этом (довольно большом) кадре данных pandas, вы можете рассмотреть их вСначала запустите pyspark, а затем соберите (меньший) результат в драйвер, надеюсь, он уместится в памяти.

Более подробная информация доступна в документации по конфигурации Spark, здесь .

0 голосов
/ 25 февраля 2019

Вы получаете java.lang.OutOfMemoryError, что, вероятно, означает, что вы пытаетесь загрузить все данные в один узел, которому не хватает оперативной памяти для обработки всего DataFrame.Если вы используете поставщика облачных решений, такого как Databricks, попробуйте увеличить размер ОЗУ кластера.

...