Если ваш inputRdd
имеет тип
inputRdd: org.apache.spark.rdd.RDD[(String, String, Int)]
Тогда вы можете достичь желаемого результата, просто используя один reduceByKey
как
inputRdd.map(x => (x._1, (List(x._2), x._3.toInt))).reduceByKey((x, y) => (x._1 ++ y._1, x._2+y._2))
, и вы можете сделать это с помощью aggregateByKey
as
inputRdd.map(x => (x._1, (List(x._2), x._3.toInt))).aggregateByKey((List.empty[String], 0))((x, y) => (x._1 ++ y._1, x._2+y._2), (x, y) => (x._1 ++ y._1, x._2+y._2))
DataFrame way
Еще лучший подход - использовать dataframe way.Вы можете преобразовать свой rdd в dataframe, просто применив .toDF("state", "city", "population")
, который должен дать вам
+-----+----+----------+
|state|city|population|
+-----+----+----------+
|A |A1 |1 |
|B |B1 |2 |
|C |C1 |3 |
|A |A2 |2 |
|A |A3 |3 |
|B |B2 |10 |
|C |C2 |5 |
+-----+----+----------+
После этого вы можете просто использовать groupBy
и применять встроенные функции агрегирования collect_list
и sum
как
import org.apache.spark.sql.functions._
inputDf.groupBy("state").agg(collect_list(col("city")).as("cityList"), sum("population").as("sumPopulation"))
, что должно дать вам
+-----+------------+-------------+
|state|cityList |sumPopulation|
+-----+------------+-------------+
|B |[B1, B2] |12 |
|C |[C1, C2] |8 |
|A |[A1, A2, A3]|6 |
+-----+------------+-------------+
Dataset
почти то же самое, но поставляется с дополнительной безопасностью типа