ООМ в искровой обработке VAR - PullRequest
2 голосов
/ 05 апреля 2019

У меня есть фрейм данных, который был создан как глобальная переменная (изменяемая).

var output

Эта переменная несколько раз обновляется внутри различных функций через объединения, как показано ниже.

def func1 = {
// some code
output = output.join(temp1, Seq("id"))
}

def func2 = {
// some code
output = output.join(temp2, Seq("id"))
}

После нескольких из вышеперечисленных вызовов функций output передается другой функции performCalculation, где выполняются некоторые вычисления. Мой вопрос: в зависимости от того, сколько раз в коде вызывается func1 и func2, я сталкиваюсь с ошибками OOM (org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 36 bytes of memory, got 0) внутри performCalculation.

Однако, если я запишу вывод до того, как он будет передан в performCalculation, прочту его обратно в другую переменную и отправлю вторую переменную в performCalculation, он будет работать нормально.

ouput.write.csv(path)
val newOutput = spark.read.csv(path)
performCalculation(newOutput)

Очевидно, что spark не любит, когда я использую изменяемые переменные, и мне приходится переписывать эту часть. Но я попытался очистить кэш и повторно кэшировать output внутри func1 и func2 каждый раз, когда output обновляется, но похоже, это не поможет.

output.unpersist()
output = output.join(temp2, Seq("id"))
output.cache()

Что я делаю не так и что я могу сделать, чтобы улучшить это.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...