Scala UDAF - java .math.BigDecimal несовместимо с org. apache .spark. sql .types.Decimal - PullRequest
0 голосов
/ 17 апреля 2020

У меня есть простая совокупная сумма UDAF.

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._


class sum extends UserDefinedAggregateFunction {

  override def inputSchema: StructType = {
    new StructType().add("val", DecimalType(28,14), nullable = true)
  }

  override def bufferSchema: StructType = StructType(StructField("sum", DecimalType(28,14)) :: Nil)

  override def dataType: DataType = DecimalType(28,14)
  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer) = buffer(0) = Decimal(0.0)

  override def update(buffer: MutableAggregationBuffer, input: Row) = {
    buffer(0) = buffer.getAs[Decimal](0) + input.getAs[Decimal](0)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Decimal](0) + buffer2.getAs[Decimal](0)
  }

  override def evaluate(buffer: Row) = buffer.getAs[Decimal](0)

}

Я определил ее и использую так:

spark.udf.register("sum", new sum)
spark.sql(""" SELECT sum(Value) from Data """).show(100, false)

Но я получаю следующую ошибку

Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 0 in stage 143.0 failed 4 times, most recent failure: Lost task 0.3 in stage 143.0 (TID 232, 172.30.104.19, executor 0): java.lang.ClassCastException: java.math.BigDecimal incompatible with org.apache.spark.sql.types.Decimal

Я не уверен, откуда исходит тип java.math.BigDecimal, похоже, что оба DecimaType и Десятичное число являются частью библиотеки org.apache.spark.sql.

...