Spark UserDefinedAggregateFunction генерирует java.lang.InternalError: искаженное имя класса - PullRequest
0 голосов
/ 24 октября 2018

[Решено!]

Я использую spark2.3.1 и scala2.12, ссылаясь на эту страницу: http://spark.apache.org/docs/latest/sql-programming-guide.html

Сначала вот файл employee.json:

{"name":"Michael", "salary":3000}
{"name":"Andy", "salary":4500}
{"name":"Justin", "salary":3500}
{"name":"Berta", "salary":4000}

Затем scala-код:

object MyAverage extends UserDefinedAggregateFunction {
  def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
  def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  }
  def dataType: DataType = DoubleType
  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
  def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("employees.json")
df.createOrReplaceTempView("employees")
df.show()
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()

При запуске этой программы на программе spark-sumbit выводится:

18/10/24 18:56:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

Exception in thread "main" java.lang.InternalError: Malformed class name
        at java.lang.Class.getSimpleName(Class.java:1330)
        at org.apache.spark.sql.execution.aggregate.ScalaUDAF.toString(udaf.scala:448)
        at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.resultAttribute$lzycompute(interfaces.scala:97)
        at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.resultAttribute(interfaces.scala:95)
        at org.apache.spark.sql.catalyst.planning.PhysicalAggregation$$anonfun$8$$anonfun$apply$2.applyOrElse(patterns.scala:243)
        at org.apache.spark.sql.catalyst.planning.PhysicalAggregation$$anonfun$8$$anonfun$apply$2.applyOrElse(patterns.scala:238)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)

Где это исправить?

...