Я сейчас пытаюсь вычислить настраиваемую агрегацию в скользящем окне и Spark продолжает выдает мне следующую ошибку :
Я был бы очень благодарен всем за хорошее решение!
Контекст
Чем я занимаюсь
Я определяю собственный агрегатор, используя org.apache.spark.sql.expressions.Aggregator
, и я пытаюсь применить:
case class MySum(colName: String) extends Aggregator[Row, Double, Double] {
def zero: Double = 0d
def reduce(acc: Double, row: Row): Double = acc * 0.78 + 0.21 * row.getAs[Double](colName)
def merge(acc1: Double, acc2: Double): Double = acc1 + acc2
def finish(acc: Double): Double = acc
def bufferEncoder: Encoder[Double] = Encoders.scalaDouble
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
Настройки
искра: 3.0.0, scala: 2.12.10
Проблема
Window function => fails
val spark = SparkSession.builder.getOrCreate
import spark.implicits._
val df = Seq(
(1L, 2.569d),
(2L, 5.89d),
(3L, 4.28d),
(4L, 2.15d),
(5L, 6.43d),
(6L, 8.92d),
(7L, 5.86d),
(8L, 1.65d),
(9L, 2.28d)
).toDF("order", "price")
val win = Window.orderBy("order").rangeBetween(0, 1)
df.withColumn("new_column", MySum("price").toColumn.as("new_column").over(win))
org. apache .spark. sql .AnalysisException: последовательность группирующих выражений пуста, а 'b
' не является агрегатом функция. Wrap '(mysum (boundreference (), value
, assertnotnull (cast (value as double)), boundreference ()) OVER (ROWS BETWEEN TURRENT ROW AND 1 FOLLOWING) AS c
)' в оконных функциях или оберните 'b
' в first () (или first_value), если вам все равно, какое значение вы получите. ;;
Стоит упомянуть
"simple agg" работает правильно :
df.groupBy("order").agg(MySum("price").toColumn.as("new_column")).show()
#+-----+----------+
#|order|new_column|
#+-----+----------+
#| 7| 2.93|
#| 6| 4.46|
#| 9| 1.14|
#| 5| 3.215|
#| 1| 1.2845|
#| 3| 2.14|
#| 8| 0.825|
#| 2| 2.945|
#| 4| 1.075|
#+-----+----------+
Как заставить работать собственный агрегатор с оконной функцией?