У меня есть простая совокупная сумма UDAF.
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
class sum extends UserDefinedAggregateFunction {
override def inputSchema: StructType = {
new StructType().add("val", DecimalType(28,14), nullable = true)
}
override def bufferSchema: StructType = StructType(StructField("sum", DecimalType(28,14)) :: Nil)
override def dataType: DataType = DecimalType(28,14)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer) = buffer(0) = Decimal(0.0)
override def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) = buffer.getAs[Decimal](0) + input.getAs[Decimal](0)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Decimal](0) + buffer2.getAs[Decimal](0)
}
override def evaluate(buffer: Row) = buffer.getAs[Decimal](0)
}
Я определил ее и использую так:
spark.udf.register("sum", new sum)
spark.sql(""" SELECT sum(Value) from Data """).show(100, false)
Но я получаю следующую ошибку
Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 143.0 failed 4 times, most recent failure: Lost task 0.3 in stage 143.0 (TID 232, 172.30.104.19, executor 0): java.lang.ClassCastException: java.math.BigDecimal incompatible with org.apache.spark.sql.types.Decimal
Я не уверен, откуда исходит тип java.math.BigDecimal
, похоже, что оба DecimaType и Десятичное число являются частью библиотеки org.apache.spark.sql
.