Утечка памяти при добавлении итерационных столбцов Pyspark - PullRequest
0 голосов
/ 21 октября 2019

Я пытался выполнить итеративные вычисления для фреймов данных pyspark. Столбцы добавляются в df на основе предыдущих столбцов. Однако я отмечаю, что используемая память продолжает увеличиваться. Простой пример показан ниже.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark import Row

conf = SparkConf().setAppName("myFirstApp").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

df = [Row(Z_0=0.0, Z_1=0.0)]
df = sc.parallelize(df).toDF()
for each in range(0,400):
    df = df.withColumn("Z_"+str(each+2), df['Z_'+str(each+1)]+1)

Насколько я понимаю, я на самом деле строю план выполнения, а не обязательно сами данные. Однако вызов выполнения df с collect(), count(), show() или преобразование в rdd или даже удаление df не освобождают память. Я видел 1,2 ГБ памяти для вышеуказанной задачи. Кажется, что сборщик мусора не может очистить предыдущие промежуточные df объекты или, возможно, что на эти объекты никогда не ссылаются.

есть лучший метод построения этого типа итеративного вычисления,или есть ли способ очистить эти промежуточные DF? Обратите внимание, что простой +1, встречающийся здесь, является лишь минимальным примером насмешки над гораздо более сложными вычислениями.

Ответы [ 2 ]

0 голосов
/ 30 октября 2019

Я обнаружил, что вы можете вызвать take(), чтобы удалить план выполнения, оставив просто значения. См. Последнюю строку для соответствующего вызова.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark import Row

conf = SparkConf().setAppName("myFirstApp").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

df = [Row(Z_0=0.0, Z_1=0.0)]
df = sc.parallelize(df).toDF()
for each in range(0,1400):
    df = df.withColumn("Z_"+str(each+2), df['Z_'+str(each+1)]+1)
    df = sc.parallelize(df.take(df.count())).toDF()

Мое утверждение в вопросе о том, что сборка мусора является проблемой, не совсем корректно. Существует разница между heap size и used heap. В исследовании с VisualVM было легко увидеть, что происходит сборка мусора, что уменьшает used heap.
enter image description here Мы видим ту проблему, с которой jvm сталкивается при обработке кода, размещенного ввопрос. К концу у нас нет места для движения. Наш heap size является максимальным, и used heap слишком велик на данный момент, и ничего для GC. Это расширение было связано не с данными, а с сохранением информации о происхождении данных. Что мне нужно было сделать, так это избавиться от всей этой линии, которая, честно говоря, не так уж полезна в этом контексте проблем, и сохранить только данные.

Ниже приведен профиль кода ответа. фрагмент выше. Даже с 1400 столбцами у нас мало проблем с хранением данных. enter image description here

0 голосов
/ 21 октября 2019

Я имел дело с тем же и не нашел хорошего решения.

Как Временное решение: разделить приложение на множество файлов .py и выполнить их один за другим , что приведет к тому, что сборщик мусора освободит весь ненужный кеш.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...