Spark: «Невозможно использовать UnspecifiedFrame. Это должно было быть преобразовано во время анализа. Пожалуйста, отправьте отчет об ошибке» - PullRequest
0 голосов
/ 09 мая 2018

Spark 2.3.0 со Scala 2.11. Я пытаюсь написать собственный агрегатор и запустить его через оконную функцию для этих документов , но получаю ошибку в заголовке. Вот урезанный пример. Это написано как тест FunSuite.

Я знаю, что в сообщении об ошибке говорится о том, что нужно отправить отчет об ошибке, но это такой простой пример, взятый почти непосредственно из документации, и мне интересно, есть ли в моем коде что-то, вызывающее ошибку. Интересно, если использование типа коллекции в качестве буфера как-то не поддерживается или необычно. Я не нашел других вопросов о переполнении стека с этой ошибкой.

package com.foobar;

import com.holdenkarau.spark.testing._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.{Aggregator, Window}
import org.scalatest.FunSuite
import org.slf4j.LoggerFactory

import scala.collection.mutable.ListBuffer

case class Foo(ID:Long, RANK:Int)

class Example extends FunSuite with DataFrameSuiteBase {
  test("example") {
    val data = Seq(
      Foo(5555, 1),
      Foo(5555, 2),
      Foo(8888, 1),
      Foo(8888, 2)
    )
    import spark.implicits._

    val df = sc.parallelize(data).toDF

    val w = Window.partitionBy("ID").orderBy("RANK")

    // The three types are: input, buffer, and output.
    object AggregateFoos extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
      // A zero value for this aggregation. Should satisfy the property that any b + zero = b
      override def zero: ListBuffer[Foo] = new ListBuffer[Foo]()

      override def reduce(b: ListBuffer[Foo], a: Foo): ListBuffer[Foo] = {
        b += a
      }

      // Merge two intermediate values
      override def merge(b1: ListBuffer[Foo], b2: ListBuffer[Foo]): ListBuffer[Foo] = {
        (b1 ++ b2).sortBy(b => b.RANK)
      }

      // Transform the output of the reduction
      override def finish(reduction: ListBuffer[Foo]): Boolean = {
        true // in real life there would be logic here
      }

      // Specifies the Encoder for the intermediate value type
      override def bufferEncoder: Encoder[ListBuffer[Foo]] = {
        ExpressionEncoder()
      }

      // Specifies the Encoder for the final output value type
      override def outputEncoder: Encoder[Boolean] = {
        ExpressionEncoder()
      }
    }

    val agg = AggregateFoos.toColumn.name("agg")

    df.select(df.col("*"), agg.over(w).as("agg")).show(false)
  }
}

Вот сообщение об ошибке:

org.apache.spark.sql.AnalysisException: невозможно разрешить '(PARTITION BY ID ORDER BY RANK ASC NULLS FIRST unspecifiedframe $ ())' из-за несоответствия типов данных: Невозможно использовать UnspecifiedFrame. Это должно было быть преобразовано во время анализа. Пожалуйста, отправьте отчет об ошибке. ;;

Ниже приводится полное исключение.

org.apache.spark.sql.AnalysisException: cannot resolve '(PARTITION BY `ID` ORDER BY `RANK` ASC NULLS FIRST unspecifiedframe$())' due to data type mismatch: Cannot use an UnspecifiedFrame. This should have been converted during analysis. Please file a bug report.;;
'Aggregate [ID#2L, RANK#3, aggregatefoos(AggregateFoos$@592e7718, None, None, None, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class Foo), if (isnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class Foo), true))) null else named_struct(ID, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class Foo), true)).ID, RANK, assertnotnull(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class Foo), true)).RANK), input[0, scala.collection.mutable.ListBuffer, true], None) AS value#9, mapobjects(MapObjects_loopValue1, MapObjects_loopIsNull1, StructField(ID,LongType,false), StructField(RANK,IntegerType,false), if (isnull(lambdavariable(MapObjects_loopValue1, MapObjects_loopIsNull1, StructField(ID,LongType,false), StructField(RANK,IntegerType,false), true))) null else newInstance(class Foo), input[0, array<struct<ID:bigint,RANK:int>>, true], Some(class scala.collection.mutable.ListBuffer)), input[0, boolean, false] AS value#8, BooleanType, false, 0, 0) windowspecdefinition(ID#2L, RANK#3 ASC NULLS FIRST, unspecifiedframe$()) AS agg#14]
+- AnalysisBarrier
      +- LocalRelation [ID#2L, RANK#3]

  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3296)
  at org.apache.spark.sql.Dataset.select(Dataset.scala:1307)
  ... 49 elided

Насколько я могу судить, это какая-то внутренняя ошибка Spark, я в недоумении, любая помощь приветствуется.

...