У меня есть фрейм данных с 20 столбцами и 25 записями (небольшие стандартные данные. Размер файла = 7 КБ). Мне нужно выполнить несколько операций над циклом данных в цикле. Цикл работает отлично в течение нескольких секунд, как и ожидалось. Проблема в том, что когда он заканчивается, и я пытаюсь показать () или записать данные на диск, мой ЦП сильно нагружается в течение многих минут (15-20 минут) с высоким использованием памяти. Много раз я получаю переполнение стека или ошибку из памяти.
Мой метод main () выглядит следующим образом:
val spark = get_spark()
val i_file = args(1)
val df = spark.read
.format("csv")
.option("delimiter", ",")
.option("header", true)
.option("inferSchema","true")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
.load(i_file)
var s_df = something.calculate(spark,df)
/////////problem starts here ///////////
s_df.repartition(col("cluster_id")) //tried without repartition also
.write.mode("overwrite")
.option("header", "true").csv("C:\\Workspace\\data\\out.csv")
И мой метод calc () в чем-то:
def calculate(spark: SparkSession, df: DataFrame): DataFrame = {
var newdf = init(spark, df) //some initialization on the dataframe
val t_count = 100
var t_i = 0
newdf.rdd.cache()
while (t_i < t_count) {
if(some codition){
newdf = calculate_type1(spark, newdf)
}else{
newdf = calculate_type2(spark, newdf)
}
t_i = t_i + 1
if(t_i === 50){
newdf.rdd.checkpoint()
}
}
return newdf
}
Мой анализ:
Заметил, что работает с меньшим количеством циклов, например, t_count = 2, все работает хорошо.
Я считаю, что проблема в том, что искра хранит граф в своей памяти и
пытается обработать график для генерации окончательного фрейма данных.
Я использую var, что неверно, так или иначе я должен использовать val и
используя leftfold или zip для обновления оригинального rdd. Но я борюсь
с этим. Может кто-нибудь помочь, пожалуйста. Большое спасибо !!!
- Мне нужен контрольно-пропускной пункт? Я не вижу никакой пользы от этого