Мне нужно перебрать rdd, вынуть данные для обработки и поместить их в буфер списка.Когда я использую метод сбора, ошибка выглядит следующим образом: java.lang.OutOfMemoryError: Превышен лимит накладных расходов GC.В противном случае буфер списка пуст.
Это мой код:
val recoList = new ListBuffer[(String,String)]()
preUserProdcut.map(row =>(row._1,row._2))
.foreach(row => {
val recoItemList = new ListBuffer[String]()
val userId = row._1
val size = row._2.size
for (i <- 0 to(size -1)){
recoItemList.append(int2ItemIdMap.getOrElse(row._2(i).product,""))
}
recoList.append((RecoModelEnum.UserCF_fqd_01.modelId + "_" +userId.toString,recoItemList.mkString(",")))
println("############" + recoList.size) //1. here length is ok
})
println("############" + recoList.size) //2. this code doesn't be executed
Когда я использую метод сбора, код отлично работает на небольшом наборе данных
preUserProdcut.map(row =>(row._1,row._2))
.collect()
.foreach(row => {.........})
println("############" + recoList.size) //here length is ok
Однако при большом количестве данных, код ошибки для запуска на искровом кластере, ошибка выглядит следующим образом: Java.Lang.OutOfMemoryError: Превышен лимит накладных расходов GC.
Я попробовал несколько способов, а именно:
sparkConf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER")
sparkConf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true")
sparkConf.set("spark.rdd.compress", "true")
Кэширование данных и настройка разделов не работает
preForUsersRdd.map(row =>(row._1,row._2)).persist().repartition(100).collect().foreach(row => {....})
Настройка памяти драйвера и памяти исполнителя также не работает
Я знаю, что метод сбора добавляет данные в память, которая подвержена ошибкам из-за большого объема данных
Как это сделатьЯ оптимизирую свой код, как мне использовать метод сбора.Список буфер пуст без метода сбора.Недостаточно памяти с методом сбора
PreUserProduct в результате sparkMllib.ALS.Возвращает RecomProductsForUsers, в результате получается форма
[(Int, Array[Rating])] objects, where every tuple contains a userID and an array of rating objects which contains the same userId, recommended productID and a "score" in the rating field.
Итак, мне нужно выполнить итерацию, обработать и получить нужный формат (useid, product1, product2, .....) с помощью foreach, listbuffer,собирать и тд., а потом через ск.MakeRDD (listbuffer), получить RDD, написать redis для использования
Метод сбора состоит в сборе данных в драйвере, который подвержен ошибкам