Предположим, что ваши данные "123456! ABCDE: 1_8; 5_10 # BCDEF: 1_3; 7_11" и "123456! ABCDE: 1_1; 7_2 # BCDEF: 1_2; 7_11", поэтому мы используем "!"чтобы получить свой идентификатор пользователя "123456"
rdd.map{f=>
val userID = f.split("!")(0)
val items = f.split("!")(1).split("#")
var result = List[Array[String]]()
for (item <- items){
val topicID = item.split(":")(0)
for (topicTypeValue <- item.split(":")(1).split(";") ){
println(topicTypeValue);
if (topicTypeValue.split("_")(0)=="1"){result = result:+Array(topicID,topicTypeValue.split("_")(1),"1") }
}
}
(userID,result)
}
.flatMapValues(x=>x).filter(f=>f._2.length==3)
.map{f=>( (f._1,f._2(0)),(f._2(1).toInt,f._2(2).toInt) )}
.reduceByKey{case(x,y)=> (x._1+y._1,x._2+y._2) }
.map(f=>(f._1._1,(f._1._2,f._2._1,f._2._2))) // (userID, (TopicID,valueSum,frequences) )
Выходные данные ("12345", ("ABCDE", 9,2)), ("12345", ("BCDEF", 5,2))немного отличающийся от вашего вывода, вы можете сгруппировать этот результат, если вам действительно нужно («12345», («ABCDE», 9,2), («BCDEF», 5,2))