Проблема с окном Spark поверх Group By - PullRequest
0 голосов
/ 06 мая 2020

Я хочу заполнить объединенное окно с зернистостью, отличной от выбранной группы. Используя Scala sql.

Select c1,c2,c3,max(c4),max(c5),
Max(c4) over (partition by c1,c2,c3),
Avg(c5) over (partition by c1,c2,c3)
From temp_view 
Group by c1,c2,c3

Получение Ошибка , говоря:

c4 and c5 not being part of Group by or use first().

1 Ответ

0 голосов
/ 25 мая 2020

Как я сказал в комментарии, GroupBy и PartitionBy имеют одну и ту же цель в нескольких аспектах. Если вы используете GroupBy, тогда вся агрегация будет работать только с этими GroupBy столбцами. То же самое происходит, когда вы используете partition by. Единственное существенное различие между ними: groupBy Уменьшает число. записей и В select нам нужно использовать только столбцы, которые используются в группе by Но в ParitionBy Количество записей не уменьшится. Вместо этого он добавит один дополнительный агрегированный столбец, и в select мы можем использовать N no. колонн.

Для вашей проблемы вы используете столбцы c1, c2, c3 в Group By и используете Max (c4), AVG (c5) с разделением по, поэтому это дает вам ошибку. Для вашего случая вы можете использовать любой из следующих запросов:

Select c1,c2,c3,max(c4),max(c5)
From temp_view 
Group by c1,c2,c3

OR

Select c1,c2,c3,
Max(c4) over (partition by c1,c2,c3),
Avg(c5) over (partition by c1,c2,c3)
From temp_view

Ниже приведен пример, который даст вам четкое изображение:

scala> spark.sql("""SELECT * from table""").show()
+---+----------------+-------+------+
| ID|            NAME|COMPANY|SALARY|
+---+----------------+-------+------+
|  1|    Gannon Chang|    ABC|440993|
|  2|   Hashim Morris|    XYZ| 49140|
|  3|       Samson Le|    ABC|413890|
|  4|   Brandon Doyle|    XYZ|384118|
|  5|    Jacob Coffey|    BCD|504819|
|  6|   Dillon Holder|    ABC|734086|
|  7|Salvador Vazquez|    NGO|895082|
|  8|    Paki Simpson|    BCD|305046|
|  9|   Laith Stewart|    ABC|943750|
| 10|  Simon Whitaker|    NGO|561896|
| 11|   Denton Torres|    BCD| 10442|
| 12|Garrison Sellers|    ABC| 53024|
| 13| Theodore Bolton|    TTT|881521|
| 14|   Kamal Roberts|    TTT|817422|
+---+----------------+-------+------+

//You can only use column to select that is in group by
scala> spark.sql("""SELECT COMPANY, max(SALARY) from table group by COMPANY""").show()
+-------+-----------+
|COMPANY|max(SALARY)|
+-------+-----------+
|    NGO|     895082|
|    BCD|     504819|
|    XYZ|     384118|
|    TTT|     881521|
|    ABC|     943750|
+-------+-----------+

//It will give error if you select all column or column other than Group By

scala> spark.sql("""SELECT *, max(SALARY) from table group by COMPANY""").show()
org.apache.spark.sql.AnalysisException: expression 'table.`ID`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Aggregate [COMPANY#94], [ID#92, NAME#93, COMPANY#94, SALARY#95L, max(SALARY#95L) AS max(SALARY)#213L]
+- SubqueryAlias table
   +- Relation[ID#92,NAME#93,COMPANY#94,SALARY#95L] parquet

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:92)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:187)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$9.apply(CheckAnalysis.scala:220)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$9.apply(CheckAnalysis.scala:220)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:220)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
  ... 49 elided


//But you can select all columns with partition by
scala> spark.sql("""SELECT *, Max(SALARY) over (PARTITION BY COMPANY) as Max_Salary from table""").show()
+---+----------------+-------+------+----------+
| ID|            NAME|COMPANY|SALARY|Max_Salary|
+---+----------------+-------+------+----------+
|  7|Salvador Vazquez|    NGO|895082|    895082|
| 10|  Simon Whitaker|    NGO|561896|    895082|
|  5|    Jacob Coffey|    BCD|504819|    504819|
|  8|    Paki Simpson|    BCD|305046|    504819|
| 11|   Denton Torres|    BCD| 10442|    504819|
|  2|   Hashim Morris|    XYZ| 49140|    384118|
|  4|   Brandon Doyle|    XYZ|384118|    384118|
| 13| Theodore Bolton|    TTT|881521|    881521|
| 14|   Kamal Roberts|    TTT|817422|    881521|
|  1|    Gannon Chang|    ABC|440993|    943750|
|  3|       Samson Le|    ABC|413890|    943750|
|  6|   Dillon Holder|    ABC|734086|    943750|
|  9|   Laith Stewart|    ABC|943750|    943750|
| 12|Garrison Sellers|    ABC| 53024|    943750|
+---+----------------+-------+------+----------+
...