Определенный идентификатор работы в спарк занимает много времени, этот этап содержит агрегирование по ключевому методу - PullRequest
0 голосов
/ 15 декабря 2018

при отправке следующего кода в кластер

со следующей конфигурацией:

мы используем двухузловой кластерный узел1: оперативная память 24 ГБ, 16 ядер, 1 узел Tb2: 24 ГБОЗУ, 4 ядра, 1 ТБ

И размер данных составляет 2,4 ГБ

spark-submit --master yarn --deploy-mode client  --executor-cores 3 --num-executors 2 --executor-memory 14g   --class  trial_1 ./Item_Based.jar datas_3.txt  

Вышеуказанное задание разделяется на 5 идентификаторов заданий

Идентификатор задания (0-3) успешно прошли без проблем

на ИД задания 4, идентификатор этапа 12, соответствующий следующей строке

val profileRatings = profilePairs.aggregateByKey(initialSet)( addop, merge )

, на этой строке задания зажигания сильно колеблются без прогресса

def main(args: Array[String]) {

    Logger.getLogger("org").setLevel(Level.ERROR)

    val conf = new SparkConf()
    conf.setAppName("New_Profile_2")  
    val sc = new SparkContext(conf)



    val data = sc.textFile(args(0))


    val ratings = data.map(x => x.split(" ")).map(x => (x(0).toInt , (x(1).toInt, 1 )))

   ratings.take(10).foreach(println) 

   val joinedRatings =  ratings.join(ratings)

   println("*******************Joined Ratings*****************************")

   joinedRatings.take(10).foreach(println) 


      println("**********************Unique Ids*************************") 

   val uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)


      uniqueJoinedRatings.take(10).foreach(println)



   println("******************mapped by Profiles***********")

   val profilePairs = uniqueJoinedRatings.map(makepairs)  //.partitionBy(new HashPartitioner(38))

   profilePairs.take(10).foreach(println)


   println("***********printing group by keys***************")



   val initialSet =    (0, 0)


   val addop =  (x: (Int,Int) , y:(Int,Int)) =>  (x._1+y._1,x._2+y._2)

    val merge = (p1 :(Int, Int), p2:(Int , Int)) => (p1._1+p2._1 , p1._2+p2._2 )

   val profileRatings = profilePairs.aggregateByKey(initialSet)( addop, merge )

    profileRatings.take(10).foreach(println)

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...