Как сгруппировать и агрегировать несколько полей с использованием combByKey RDD? - PullRequest
0 голосов
/ 13 ноября 2018

У меня есть пример файла, который я пытаюсь выяснить для данного поля общее число другого поля и его количество и список значений из другого поля, используя combineByKey.Я пытаюсь понять combineByKey, то же самое требование, которое я понял из этого вопроса с использованием aggregateByKey, теперь я хотел бы понять combineByKey.

Я попробовал приведенный ниже код, которыйтак же, как aggregateByKey, но я получаю ошибку несоответствия типов.Я не уверен, правильны ли мои типы для createCombiner или mergeValue или mergeCombiner.Пожалуйста, помогите мне получить хорошее понимание combineByKey.

Пример данных:

44,8602,37.19
44,8331,99.19
44,1919,39.54
44,2682,41.88
44,7366,66.54
44,3405,81.09
44,9957,94.79 

Код для combineByKey:

val rdd = sc.textFile("file:///../customer-orders.csv_sample").map(x => x.split(",")).map(x => (x(0).toInt, (x(1).toInt, x(2).toFloat)))

def createCombiner = (tuple: (Seq[Int],Double, Int)) => (tuple,1)

def mergeValue = (acc: (Seq[Int],Double,Int),xs: (Int,Float)) => {
  println(s"""mergeValue: (${acc._1} ++ ${Seq(xs._1)}, ${acc._2} +${xs._2},${acc._3} + 1)""")
  (acc._1 ++ Seq(xs._1), acc._2 + xs._2, acc._3 + 1)
}

def mergeCombiner = (acc1: (Seq[Int],Double,Int), acc2: (Seq[Int],Double,Int)) => {
  println(s"""mergeCombiner: (${acc1._1} ++ ${acc2._1}, ${acc1._2} +${acc2._2}, ${acc1._3} + ${acc2._3})""")
  (acc1._1 ++ acc2._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}

rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)

Сообщение об ошибке:

error: type mismatch;
found   : ((Seq[Int], Double, Int)) => ((Seq[Int], Double, Int), Int)
required: ((Int, Float)) => ?
rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)
                 ^

Ожидаемый результат:

customerid, (orderids,..,..,....), totalamount, number of orderids

Используя предоставленные данные примера, он будет:

(44,(List(8602, 8331, 1919, 2682, 7366, 3405, 9957),460.2200012207031,7))

Несоответствие указывает на createCombiner.Может ли кто-нибудь помочь мне понять, combineByKey?

Ответы [ 3 ]

0 голосов
/ 13 ноября 2018

Вот подпись combByKey:

combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

значение слияния имеет тип (C, V) => C

Где C должно быть ((Seq[Int],Double, Int), Int), а V должно быть (Seq[Int],Double, Int)

Ваш метод mergeValue имеет типы C (Seq[Int],Double,Int) и V (Int,Float)

Тип mergeCombiner также неверен.

Это должно быть (C, C) => C, где C равно ((Seq[Int],Double, Int), Int)

0 голосов
/ 14 ноября 2018

Проблема здесь заключается в функции createCombiner. Посмотрите на combineByKey:

combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

Проще говоря, C - это формат, который вы хотите получить ((Seq[Int], Double, Int)) и V, с которого вы начинаете ((Int, Double)). Здесь я изменил Float на Double, поскольку именно это обычно используется в Spark. Это означает, что функция createCombiner должна выглядеть следующим образом:

def createCombiner = (tuple: (Int, Double)) => (Seq(tuple._1), tuple._2, 1)

И mergeValue, и mergeCombiner выглядят нормально, однако вы не увидите никаких операторов печати в Spark, если выполняете код в кластере (см .: Spark теряет println () в stdout ) .

0 голосов
/ 13 ноября 2018

Я не знаком со Spark.

Я надеюсь, что это может помочь вам.

val array = Array((44,8602,37.19),(44,8331,99.19),(44,1919,39.54),(44,2682,41.88),(44,7366,66.54),(44,3405,81.09),(44,9957,94.79))

array.groupBy(_._1).map(e => (e._1, e._2.map(_._2).toList, e._2.map(_._3).sum))
//res1: scala.collection.immutable.Iterable[(Int, List[Int], Double)] = List((44,List(8602, 8331, 1919, 2682, 7366, 3405, 9957),460.21999999999997))

Я вижу, ваша ошибка связана с

def createCombiner = (tuple: (Seq [Int]), Double, Int)) => (tuple, 1)

Я думаю, createCombiner должно взять несколько Seq кортежей и вернуть кортежи Int и Seq (groupby)

def createCombiner = (tuple: Seq [(Int, Int, Double)]) => tuple.groupBy (_._ 1)

Надеюсь, это поможет.

...