SparkSQL groupby для создания вложенных записей - PullRequest
0 голосов
/ 13 марта 2019

У меня есть информация в форме (очевидно, фальшивая, но служит цели):

| User | Country |
|------|---------|
| A    | Sweden  |
| A    | Sweden  |
| A    | London  |
| B    | Spain   |
| B    | Denmark |
| B    | Brazil  |
| C    | India   |

Это доступно в качестве кадра данных в искре.Я искал использовать spark (и, может быть, SparkSQL) для расчета карты частот для каждого пользователя.

(A => Map((Sweden, 2), (London, 1)))
(B => Map((Spain, 1), (Brazil, 1), (Denmark, 1)))
(C => Map((India, 1)))

До сих пор я достигал:

(A => (Sweden, 2))
(A => (London, 1))
(B => (Spain, 1))
(B => (Brazil, 1))
(B => (Denmark, 1))
(C => (India, 1))

с помощьюследующий запрос:

SELECT user, country, COUNT(country) as frequency
FROM information
GROUP BY user, country

, но проблема в том, что я получаю 6 строк вместо 3. Не знаю, куда идти.

Ответы [ 2 ]

2 голосов
/ 13 марта 2019

Вы можете применить еще один groupBy/agg для агрегирования struct(Country, Frequency), используя collect_list, как показано ниже:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  ("A", "Sweden"), ("A", "Sweden"), ("A", "London"),
  ("B", "Spain"), ("B", "Denmark"), ("B", "Brazil"),
  ("C", "India")
).toDF("User", "Country")

df.
  groupBy("User", "Country").agg(count("Country").as("Frequency")).
  groupBy("User").agg(collect_list(struct("Country", "Frequency")).as("Country_Counts")).
  show(false)
// +----+------------------------------------+
// |User|Country_Counts                      |
// +----+------------------------------------+
// |B   |[[Denmark,1], [Brazil,1], [Spain,1]]|
// |C   |[[India,1]]                         |
// |A   |[[London,1], [Sweden,2]]            |
// +----+------------------------------------+

Обратите внимание, что первое преобразование groupBy/agg эквивалентно вашему запросу SQL.

0 голосов
/ 13 марта 2019

После этого вам нужно сгруппировать по пользователю и собрать карту страны и периодичность.Приведенный ниже код должен быть полезен.

//Creating Test Data
val df = Seq(("A", "Sweden"), ("A", "Sweden"), ("A", "London"), ("B", "Spain"), ("B", "Denmark"), ("B", "Brazil"), ("C", "India"))
  .toDF("user", "country")

df.show(false)
+----+-------+
|user|country|
+----+-------+
|A   |Sweden |
|A   |Sweden |
|A   |London |
|B   |Spain  |
|B   |Denmark|
|B   |Brazil |
|C   |India  |
+----+-------+

df.registerTempTable("information")

val joinMap = spark.udf.register( "joinMap" , (values: Seq[Map[String,Long]]) => values.flatten.toMap )

val resultDF = spark.sql("""SELECT user, joinMap(collect_list(map(country, frequency))) as frequencyMap
                           |From ( SELECT user, country, COUNT(country) as frequency
                           |FROM information
                           |GROUP BY user, country ) A
                           |GROUP BY user""".stripMargin)

resultDF.show(false)
+----+------------------------------------------+
|user|frequencyMap                              |
+----+------------------------------------------+
|A   |Map(Sweden -> 2, London -> 1)             | 
|B   |Map(Spain -> 1, Denmark -> 1, Brazil -> 1)|
|C   |Map(India -> 1)                           |
+----+------------------------------------------+

Если вы хотите получить конечный результат как Map, используйте UDF.Без UDF вы получите его в виде списка карт.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...