[Решено!]
Я использую spark2.3.1 и scala2.12, ссылаясь на эту страницу: http://spark.apache.org/docs/latest/sql-programming-guide.html
Сначала вот файл employee.json:
{"name":"Michael", "salary":3000}
{"name":"Andy", "salary":4500}
{"name":"Justin", "salary":3500}
{"name":"Berta", "salary":4000}
Затем scala-код:
object MyAverage extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
def dataType: DataType = DoubleType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
spark.udf.register("myAverage", MyAverage)
val df = spark.read.json("employees.json")
df.createOrReplaceTempView("employees")
df.show()
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
При запуске этой программы на программе spark-sumbit выводится:
18/10/24 18:56:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+-------+------+
| name|salary|
+-------+------+
|Michael| 3000|
| Andy| 4500|
| Justin| 3500|
| Berta| 4000|
+-------+------+
Exception in thread "main" java.lang.InternalError: Malformed class name
at java.lang.Class.getSimpleName(Class.java:1330)
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.toString(udaf.scala:448)
at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.resultAttribute$lzycompute(interfaces.scala:97)
at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.resultAttribute(interfaces.scala:95)
at org.apache.spark.sql.catalyst.planning.PhysicalAggregation$$anonfun$8$$anonfun$apply$2.applyOrElse(patterns.scala:243)
at org.apache.spark.sql.catalyst.planning.PhysicalAggregation$$anonfun$8$$anonfun$apply$2.applyOrElse(patterns.scala:238)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
Где это исправить?