Я обнаружил, что вы можете вызвать take()
, чтобы удалить план выполнения, оставив просто значения. См. Последнюю строку для соответствующего вызова.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark import Row
conf = SparkConf().setAppName("myFirstApp").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
df = [Row(Z_0=0.0, Z_1=0.0)]
df = sc.parallelize(df).toDF()
for each in range(0,1400):
df = df.withColumn("Z_"+str(each+2), df['Z_'+str(each+1)]+1)
df = sc.parallelize(df.take(df.count())).toDF()
Мое утверждение в вопросе о том, что сборка мусора является проблемой, не совсем корректно. Существует разница между heap size
и used heap
. В исследовании с VisualVM было легко увидеть, что происходит сборка мусора, что уменьшает used heap
.
Мы видим ту проблему, с которой jvm сталкивается при обработке кода, размещенного ввопрос. К концу у нас нет места для движения. Наш heap size
является максимальным, и used heap
слишком велик на данный момент, и ничего для GC. Это расширение было связано не с данными, а с сохранением информации о происхождении данных. Что мне нужно было сделать, так это избавиться от всей этой линии, которая, честно говоря, не так уж полезна в этом контексте проблем, и сохранить только данные.
Ниже приведен профиль кода ответа. фрагмент выше. Даже с 1400 столбцами у нас мало проблем с хранением данных. ![enter image description here](https://i.stack.imgur.com/OUVoM.png)