Как использовать собственный агрегатор искр с оконной функцией - PullRequest
0 голосов
/ 02 августа 2020

Я сейчас пытаюсь вычислить настраиваемую агрегацию в скользящем окне и Spark продолжает выдает мне следующую ошибку :

Я был бы очень благодарен всем за хорошее решение!

Контекст

Чем я занимаюсь

Я определяю собственный агрегатор, используя org.apache.spark.sql.expressions.Aggregator, и я пытаюсь применить:

case class MySum(colName: String) extends Aggregator[Row, Double, Double] {

    def zero: Double = 0d
    def reduce(acc: Double, row: Row): Double = acc * 0.78 + 0.21 * row.getAs[Double](colName)

    def merge(acc1: Double, acc2: Double): Double = acc1 + acc2
    def finish(acc: Double): Double = acc

    def bufferEncoder: Encoder[Double] = Encoders.scalaDouble
    def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

Настройки

искра: 3.0.0, scala: 2.12.10

Проблема

Window function => fails

val spark = SparkSession.builder.getOrCreate
import spark.implicits._
val df = Seq(
      (1L, 2.569d),
      (2L, 5.89d),
      (3L, 4.28d),
      (4L, 2.15d),
      (5L, 6.43d),
      (6L, 8.92d),
      (7L, 5.86d),
      (8L, 1.65d),
      (9L, 2.28d)
).toDF("order", "price")

val win = Window.orderBy("order").rangeBetween(0, 1)
df.withColumn("new_column", MySum("price").toColumn.as("new_column").over(win))

org. apache .spark. sql .AnalysisException: последовательность группирующих выражений пуста, а 'b' не является агрегатом функция. Wrap '(mysum (boundreference (), value, assertnotnull (cast (value as double)), boundreference ()) OVER (ROWS BETWEEN TURRENT ROW AND 1 FOLLOWING) AS c)' в оконных функциях или оберните 'b' в first () (или first_value), если вам все равно, какое значение вы получите. ;;

Стоит упомянуть

"simple agg" работает правильно :

df.groupBy("order").agg(MySum("price").toColumn.as("new_column")).show()

#+-----+----------+
#|order|new_column|
#+-----+----------+
#|    7|      2.93|
#|    6|      4.46|
#|    9|      1.14|
#|    5|     3.215|
#|    1|    1.2845|
#|    3|      2.14|
#|    8|     0.825|
#|    2|     2.945|
#|    4|     1.075|
#+-----+----------+

Как заставить работать собственный агрегатор с оконной функцией?

...