при отправке следующего кода в кластер
со следующей конфигурацией:
мы используем двухузловой кластерный узел1: оперативная память 24 ГБ, 16 ядер, 1 узел Tb2: 24 ГБОЗУ, 4 ядра, 1 ТБ
И размер данных составляет 2,4 ГБ
spark-submit --master yarn --deploy-mode client --executor-cores 3 --num-executors 2 --executor-memory 14g --class trial_1 ./Item_Based.jar datas_3.txt
Вышеуказанное задание разделяется на 5 идентификаторов заданий
Идентификатор задания (0-3) успешно прошли без проблем
на ИД задания 4, идентификатор этапа 12, соответствующий следующей строке
val profileRatings = profilePairs.aggregateByKey(initialSet)( addop, merge )
, на этой строке задания зажигания сильно колеблются без прогресса
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
conf.setAppName("New_Profile_2")
val sc = new SparkContext(conf)
val data = sc.textFile(args(0))
val ratings = data.map(x => x.split(" ")).map(x => (x(0).toInt , (x(1).toInt, 1 )))
ratings.take(10).foreach(println)
val joinedRatings = ratings.join(ratings)
println("*******************Joined Ratings*****************************")
joinedRatings.take(10).foreach(println)
println("**********************Unique Ids*************************")
val uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)
uniqueJoinedRatings.take(10).foreach(println)
println("******************mapped by Profiles***********")
val profilePairs = uniqueJoinedRatings.map(makepairs) //.partitionBy(new HashPartitioner(38))
profilePairs.take(10).foreach(println)
println("***********printing group by keys***************")
val initialSet = (0, 0)
val addop = (x: (Int,Int) , y:(Int,Int)) => (x._1+y._1,x._2+y._2)
val merge = (p1 :(Int, Int), p2:(Int , Int)) => (p1._1+p2._1 , p1._2+p2._2 )
val profileRatings = profilePairs.aggregateByKey(initialSet)( addop, merge )
profileRatings.take(10).foreach(println)
}