Пара ключ-значение RDD с составным значением - PullRequest
0 голосов
/ 28 мая 2018

У меня здесь есть набор игрушечных данных, для которого мне нужно вычислить список городов в каждом штате и населении этого штата (сумма населения всех городов в этом штате) Данные

Я хочу сделать это, используя RDD без использования groupByKey и соединений.Мой подход до сих пор:

В этом подходе я использовал 2 отдельные пары ключ-значение и соединил их.

val rdd1=inputRdd.map(x=>(x._1,x._3.toInt))
val rdd2=inputRdd.map(x=>(x._1,x._2))
val popn_sum=rdd1.reduceByKey(_+_)
val list_cities=rdd2.reduceByKey(_++_)
popn_sum.join(list_cities).collect()

Можно ли получить один и тот же вывод только с одной парой ключ-значениеи без каких-либо присоединений.Я создал новую пару ключ-значение, но я не знаю, как поступить, чтобы получить тот же вывод, используя aggregateByKey или ReduceByKey с этим RDD:

val rdd3=inputRdd.map(x=>(x._1,(List(x._2),x._3))) 

Я новичок в поиске и хочу узнатьлучший способ получить этот вывод.

Array((B,(12,List(B1, B2))), (A,(6,List(A1, A2, A3))), (C,(8,List(C1, C2))))

Заранее спасибо

1 Ответ

0 голосов
/ 28 мая 2018

Если ваш inputRdd имеет тип

inputRdd: org.apache.spark.rdd.RDD[(String, String, Int)]

Тогда вы можете достичь желаемого результата, просто используя один reduceByKey как

inputRdd.map(x => (x._1, (List(x._2), x._3.toInt))).reduceByKey((x, y) => (x._1 ++ y._1, x._2+y._2))

, и вы можете сделать это с помощью aggregateByKey as

inputRdd.map(x => (x._1, (List(x._2), x._3.toInt))).aggregateByKey((List.empty[String], 0))((x, y) => (x._1 ++ y._1, x._2+y._2), (x, y) => (x._1 ++ y._1, x._2+y._2))

DataFrame way

Еще лучший подход - использовать dataframe way.Вы можете преобразовать свой rdd в dataframe, просто применив .toDF("state", "city", "population"), который должен дать вам

+-----+----+----------+
|state|city|population|
+-----+----+----------+
|A    |A1  |1         |
|B    |B1  |2         |
|C    |C1  |3         |
|A    |A2  |2         |
|A    |A3  |3         |
|B    |B2  |10        |
|C    |C2  |5         |
+-----+----+----------+

После этого вы можете просто использовать groupBy и применять встроенные функции агрегирования collect_list и sum как

import org.apache.spark.sql.functions._
inputDf.groupBy("state").agg(collect_list(col("city")).as("cityList"), sum("population").as("sumPopulation"))

, что должно дать вам

+-----+------------+-------------+
|state|cityList    |sumPopulation|
+-----+------------+-------------+
|B    |[B1, B2]    |12           |
|C    |[C1, C2]    |8            |
|A    |[A1, A2, A3]|6            |
+-----+------------+-------------+

Dataset почти то же самое, но поставляется с дополнительной безопасностью типа

...