У меня есть пример файла, который я пытаюсь выяснить для данного поля общее число другого поля и его количество и список значений из другого поля, используя combineByKey
.Я пытаюсь понять combineByKey
, то же самое требование, которое я понял из этого вопроса с использованием aggregateByKey
, теперь я хотел бы понять combineByKey
.
Я попробовал приведенный ниже код, которыйтак же, как aggregateByKey
, но я получаю ошибку несоответствия типов.Я не уверен, правильны ли мои типы для createCombiner
или mergeValue
или mergeCombiner
.Пожалуйста, помогите мне получить хорошее понимание combineByKey
.
Пример данных:
44,8602,37.19
44,8331,99.19
44,1919,39.54
44,2682,41.88
44,7366,66.54
44,3405,81.09
44,9957,94.79
Код для combineByKey
:
val rdd = sc.textFile("file:///../customer-orders.csv_sample").map(x => x.split(",")).map(x => (x(0).toInt, (x(1).toInt, x(2).toFloat)))
def createCombiner = (tuple: (Seq[Int],Double, Int)) => (tuple,1)
def mergeValue = (acc: (Seq[Int],Double,Int),xs: (Int,Float)) => {
println(s"""mergeValue: (${acc._1} ++ ${Seq(xs._1)}, ${acc._2} +${xs._2},${acc._3} + 1)""")
(acc._1 ++ Seq(xs._1), acc._2 + xs._2, acc._3 + 1)
}
def mergeCombiner = (acc1: (Seq[Int],Double,Int), acc2: (Seq[Int],Double,Int)) => {
println(s"""mergeCombiner: (${acc1._1} ++ ${acc2._1}, ${acc1._2} +${acc2._2}, ${acc1._3} + ${acc2._3})""")
(acc1._1 ++ acc2._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}
rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)
Сообщение об ошибке:
error: type mismatch;
found : ((Seq[Int], Double, Int)) => ((Seq[Int], Double, Int), Int)
required: ((Int, Float)) => ?
rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)
^
Ожидаемый результат:
customerid, (orderids,..,..,....), totalamount, number of orderids
Используя предоставленные данные примера, он будет:
(44,(List(8602, 8331, 1919, 2682, 7366, 3405, 9957),460.2200012207031,7))
Несоответствие указывает на createCombiner
.Может ли кто-нибудь помочь мне понять, combineByKey
?