Сбой в Spark-SQL для оконной функции - PullRequest
0 голосов
/ 13 октября 2018

При использовании Spark 2.3.2 и Spark-SQL следующий запрос 'b' завершается неудачно:

import spark.implicits._

val dataset = Seq((30, 2.0), (20, 3.0), (19, 20.0)).toDF("age", "size")

import functions._
val a0 = dataset.withColumn("rank", rank() over Window.partitionBy('age).orderBy('size))
val a1 = a0.agg(avg('rank))
//a1.show()
//OK

//same thing but in one expression, crashes:
val b = dataset.agg(functions.avg(functions.rank().over(Window.partitionBy('age).orderBy('size))))

AFAIK, это довольно странно, но это допустимый запрос SQL:

  • Я определяю столбец, который является результатом оконной функции
  • , затем берю среднее значение

Выполнение с использованием промежуточного столбца работает, но выполняется в одномвыражение приводит к сбою катализатора с переполнением стека:

Exception in thread "main" java.lang.StackOverflowError
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$find$1.apply(TreeNode.scala:109)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$find$1.apply(TreeNode.scala:109)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.find(TreeNode.scala:109)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$find$1$$anonfun$apply$1.apply(TreeNode.scala:109)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$find$1$$anonfun$apply$1.apply(TreeNode.scala:109)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$find$1.apply(TreeNode.scala:109)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$find$1.apply(TreeNode.scala:109)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.find(TreeNode.scala:109)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$find$1$$anonfun$apply$1.apply(TreeNode.scala:109)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$find$1$$anonfun$apply$1.apply(TreeNode.scala:109)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$find$1.apply(TreeNode.scala:109)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$find$1.apply(TreeNode.scala:109)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.find(TreeNode.scala:109)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.org$apache$spark$sql$catalyst$analysis$Analyzer$ExtractWindowExpressions$$hasWindowFunction(Analyzer.scala:1757)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$71.apply(Analyzer.scala:1781)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$71.apply(Analyzer.scala:1781)
at scala.collection.TraversableLike$$anonfun$partition$1.apply(TraversableLike.scala:314)
at scala.collection.TraversableLike$$anonfun$partition$1.apply(TraversableLike.scala:314)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.partition(TraversableLike.scala:314)
at scala.collection.AbstractTraversable.partition(Traversable.scala:104)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.org$apache$spark$sql$catalyst$analysis$Analyzer$ExtractWindowExpressions$$extract(Analyzer.scala:1781)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$apply$28.applyOrElse(Analyzer.scala:1950)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$apply$28.applyOrElse(Analyzer.scala:1925)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) [...]

Это известная проблема?Я не уверен на 100%, что мой запрос верен, но, по крайней мере, он не должен приводить к сбою катализатора, поскольку он падает даже до того, как я смогу оценить свой запрос

1 Ответ

0 голосов
/ 15 октября 2018

avg () принимает имя столбца в качестве аргумента, но rank () передает фактические данные столбца, поэтому он не может найти имя столбца.

Он работает так же, как:

dataset.select( rank().over(Window.partitionBy("age").orderBy("size")).as("rank_XX") 
     ).
     agg(avg("rank_XX")).show() 

Выход:

+------------+                                                                  
|avg(rank_XX)|
+------------+
|         1.0|
+------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...