Spark rdd использует метод collect для генерации ошибки OutOfMemoryError - PullRequest
0 голосов
/ 16 мая 2019

Мне нужно перебрать rdd, вынуть данные для обработки и поместить их в буфер списка.Когда я использую метод сбора, ошибка выглядит следующим образом: java.lang.OutOfMemoryError: Превышен лимит накладных расходов GC.В противном случае буфер списка пуст.

Это мой код:

val recoList = new ListBuffer[(String,String)]()
preUserProdcut.map(row =>(row._1,row._2))
  .foreach(row => {
    val recoItemList = new ListBuffer[String]()
    val userId = row._1
    val size = row._2.size
    for (i <- 0 to(size -1)){
      recoItemList.append(int2ItemIdMap.getOrElse(row._2(i).product,""))
    }
    recoList.append((RecoModelEnum.UserCF_fqd_01.modelId + "_" +userId.toString,recoItemList.mkString(",")))
    println("############" + recoList.size)  //1. here length is ok
  })

println("############" + recoList.size)     //2. this code doesn't be executed

Когда я использую метод сбора, код отлично работает на небольшом наборе данных

preUserProdcut.map(row =>(row._1,row._2))
  .collect()
  .foreach(row => {.........})
println("############" + recoList.size)     //here length is ok

Однако при большом количестве данных, код ошибки для запуска на искровом кластере, ошибка выглядит следующим образом: Java.Lang.OutOfMemoryError: Превышен лимит накладных расходов GC.

Я попробовал несколько способов, а именно:

sparkConf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER")
sparkConf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true")
sparkConf.set("spark.rdd.compress", "true")

Кэширование данных и настройка разделов не работает

preForUsersRdd.map(row =>(row._1,row._2)).persist().repartition(100).collect().foreach(row => {....})

Настройка памяти драйвера и памяти исполнителя также не работает

Я знаю, что метод сбора добавляет данные в память, которая подвержена ошибкам из-за большого объема данных

Как это сделатьЯ оптимизирую свой код, как мне использовать метод сбора.Список буфер пуст без метода сбора.Недостаточно памяти с методом сбора

PreUserProduct в результате sparkMllib.ALS.Возвращает RecomProductsForUsers, в результате получается форма

[(Int, Array[Rating])] objects, where every tuple contains a userID and an array of rating objects which contains the same userId, recommended productID and     a "score" in the rating field.

Итак, мне нужно выполнить итерацию, обработать и получить нужный формат (useid, product1, product2, .....) с помощью foreach, listbuffer,собирать и тд., а потом через ск.MakeRDD (listbuffer), получить RDD, написать redis для использования

Метод сбора состоит в сборе данных в драйвере, который подвержен ошибкам

1 Ответ

0 голосов
/ 16 мая 2019

Я не понимаю, почему вам нужно использовать ListBuffer.

Вы должны использовать write вместо collect.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...