У меня есть фрейм данных, который был создан как глобальная переменная (изменяемая).
var output
Эта переменная несколько раз обновляется внутри различных функций через объединения, как показано ниже.
def func1 = {
// some code
output = output.join(temp1, Seq("id"))
}
def func2 = {
// some code
output = output.join(temp2, Seq("id"))
}
После нескольких из вышеперечисленных вызовов функций output
передается другой функции performCalculation
, где выполняются некоторые вычисления. Мой вопрос: в зависимости от того, сколько раз в коде вызывается func1 и func2, я сталкиваюсь с ошибками OOM (org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 36 bytes of memory, got 0
) внутри performCalculation
.
Однако, если я запишу вывод до того, как он будет передан в performCalculation
, прочту его обратно в другую переменную и отправлю вторую переменную в performCalculation
, он будет работать нормально.
ouput.write.csv(path)
val newOutput = spark.read.csv(path)
performCalculation(newOutput)
Очевидно, что spark не любит, когда я использую изменяемые переменные, и мне приходится переписывать эту часть. Но я попытался очистить кэш и повторно кэшировать output
внутри func1
и func2
каждый раз, когда output
обновляется, но похоже, это не поможет.
output.unpersist()
output = output.join(temp2, Seq("id"))
output.cache()
Что я делаю не так и что я могу сделать, чтобы улучшить это.