Spark: уменьшить / агрегировать по ключу - PullRequest
0 голосов
/ 13 сентября 2018

Я новичок в Spark и Scala, поэтому понятия не имею, как называется проблема такого типа (что делает ее поиск довольно сложной).

У меня есть данные следующей структуры:

[(date1, (name1, 1)), (date1, (name1, 1)), (date1, (name2, 1)), (date2, (name3, 1))]

В некотором смысле это должно быть уменьшено / агрегировано до:

[(date1, [(name1, 2), (name2, 1)]), (date2, [(name3, 1)])]

Я знаю, как сделать reduceByKey в списке пар ключ-значение, но эта конкретная проблема для меня загадка.

Заранее спасибо!

1 Ответ

0 голосов
/ 14 сентября 2018

Мои данные, но здесь пошагово:

val rdd1 = sc.makeRDD(Array( ("d1",("A",1)), ("d1",("A",1)), ("d1",("B",1)), ("d2",("E",1)) ),2)
val rdd2 = rdd1.map(x => ((x._1, x._2._1), x._2._2))
val rdd3 = rdd2.groupByKey
val rdd4 = rdd3.map{ 
   case (str, nums) => (str, nums.sum) 
}
val rdd5 =  rdd4.map(x => (x._1._1, (x._1._2, x._2))).groupByKey
rdd5.collect

возвращается:

res28: Array[(String, Iterable[(String, Int)])] = Array((d2,CompactBuffer((E,1))), (d1,CompactBuffer((A,2), (B,1))))

Лучше избегать использования groupByKey следующим образом:

val rdd1 = sc.makeRDD(Array( ("d1",("A",1)), ("d1",("A",1)), ("d1",("B",1)), ("d2",("E",1)) ),2)
val rdd2 = rdd1.map(x => ((x._1, x._2._1), (x._2._2))) // Need to add quotes around V part for reduceByKey
val rdd3 = rdd2.reduceByKey(_+_)
val rdd4 = rdd3.map(x => (x._1._1, (x._1._2, x._2))).groupByKey // Necessary Shuffle
rdd4.collect

Как я уже говорил в столбцах, это можно сделать с помощью DataFrames для структурированных данных, поэтому запустите это ниже:

// This above should be enough.
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

val rddA = sc.makeRDD(Array( ("d1","A",1), ("d1","A",1), ("d1","B",1), ("d2","E",1) ),2)
val dfA = rddA.toDF("c1", "c2", "c3")

val dfB = dfA
   .groupBy("c1", "c2")
   .agg(sum("c3").alias("sum"))
dfB.show

возвращается:

+---+---+---+
| c1| c2|sum|
+---+---+---+
| d1|  A|  2|
| d2|  E|  1|
| d1|  B|  1|
+---+---+---+

Но вы можете сделать это примерно так, как указано выше для CompactBuffer.

import org.apache.spark.sql.functions.{col, udf}

case class XY(x: String, y: Long)
val xyTuple = udf((x: String, y: Long) => XY(x, y))

val dfC = dfB
         .withColumn("xy", xyTuple(col("c2"), col("sum")))
         .drop("c2")
         .drop("sum")

dfC.printSchema
dfC.show

// Then ... this gives you the CompactBuffer answer but from a DF-perspective
val dfD = dfC.groupBy(col("c1")).agg(collect_list(col("xy")))   
dfD.show

возврат - некоторые переименование требует и возможна сортировка:

---+----------------+
| c1|collect_list(xy)|
+---+----------------+
| d2|        [[E, 1]]|
| d1|[[A, 2], [B, 1]]|
+---+----------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...