Spark - запросить фрейм данных на основе значений из столбца в другом фрейме данных - PullRequest
0 голосов
/ 24 октября 2018

Я могу придумать несколько неправильных способов сделать это, но я пытаюсь найти лучший способ сделать это.Позвольте мне объяснить:

  Table A
  id  topScore
  A   13
  B   24
  C   15

  Table B
  id  score
  A   6
  A   3
  A   18
  A   8
  B   8
  B   18
  B   26
  B   12
  C   1
  C   4
  C   20
  C   9

Я хочу иметь возможность получить максимальную оценку из таблицы B без превышения оценки для этого идентификатора в таблице A.

Конечный результат должен выглядеть следующим образом:

  Table c
  id  score
  A   8
  B   18
  C   9

Итак, я думаю, все, что я хочу сделать, это в основном фильтровать DF таблицы B, говоря.Для идентификатора получите MAX (TableB.score), где оценка

Ответы [ 3 ]

0 голосов
/ 24 октября 2018

Другой подход с использованием оконных функций.

scala>  val dfa = Seq(("A","13"),("B","24"),("C","15")).toDF("id","topscore").withColumn("topscore",'topscore.cast("int")).withColumn("c",lit("a"))
dfa: org.apache.spark.sql.DataFrame = [id: string, topscore: int, c: string]

scala> val dfb = Seq(("A","6"),  ("A","3"),  ("A","18"),  ("A","8"), ("B","8"),  ("B","18"),  ("B","26"),  ("B","12"),  ("C","1"),  ("C","4"),  ("C","20"),  ("C","9")).toDF("id","score").withColumn("score",'score.cast("int")).withColumn("c",lit("b"))
dfb: org.apache.spark.sql.DataFrame = [id: string, score: int, c: string]

scala> dfa.unionAll(dfb).withColumn("x",rank().over(Window.partitionBy('c,'id) orderBy('topscore).desc )).filter('c==="b" and 'x===2).show
+---+--------+---+---+
| id|topscore|  c|  x|
+---+--------+---+---+
|  A|       8|  b|  2|
|  B|      18|  b|  2|
|  C|       9|  b|  2|
+---+--------+---+---+


scala>
0 голосов
/ 24 октября 2018

Соедините обе таблицы по «id», отфильтруйте «tableB» по «tableA.topScore», а затем возьмите «max»:

val tableA = List(("A", 13), ("B", 24), ("C", 15)).toDF("id", "topScore")
val tableB = List(("A", 6), ("A", 3), ("A", 18), ("A", 8),
  ("B", 8), ("B", 18), ("B", 26), ("B", 12),
  ("C", 1), ("C", 4), ("C", 20), ("C", 9)).toDF("id", "topScore")

// action
val result = tableA.alias("a")
  .join(tableB.alias("b"), Seq("id"), "left")
  .where($"a.topScore" > $"b.topScore" || $"b.topScore".isNull)
  .groupBy("a.id").agg(max($"b.topScore").alias("topScore"))

result.show(false)

Вывод:

+---+--------+
|id |topScore|
+---+--------+
|A  |8       |
|B  |18      |
|C  |9       |
+---+--------+
0 голосов
/ 24 октября 2018

Надеюсь, этот фрагмент поможет вам:

scala> val tableA = spark.sparkContext.parallelize(List(
     | ("A",13),
     | ("B",24),
     | ("C",15))).toDF("id","topScore")
tableA: org.apache.spark.sql.DataFrame = [id: string, topScore: int]

scala> val tableB = spark.sparkContext.parallelize(List(
     | ("A",6),
     | ("A",3),
     | ("A",18),
     | ("A",8),
     | ("B",8),
     | ("B",18),
     | ("B",26),
     | ("B",12),
     | ("C",1),
     | ("C",4),
     | ("C",20),
     | ("C",9))).toDF("id","topScore")
tableB: org.apache.spark.sql.DataFrame = [id: string, topScore: int]


scala> val tableC = tableB.withColumnRenamed("topScore","topScoreB").withColumnRenamed("id","id1")
scala> tableC.show
+---+---------+
|id1|topScoreB|
+---+---------+
|  A|        6|
|  A|        3|
|  A|       18|
|  A|        8|
|  B|        8|
|  B|       18|
|  B|       26|
|  B|       12|
|  C|        1|
|  C|        4|
|  C|       20|
|  C|        9|
+---+---------+


scala> tableA.join(tableC, tableA("id")===tableC("id1"), "left").filter($"topScore" >= $"topScoreB").select("id","topScoreB").groupBy("id").agg(max($"topScoreB")).show
+---+--------------+
| id|max(topScoreB)|
+---+--------------+
|  B|            18|
|  C|             9|
|  A|             8|
+---+--------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...