Вы можете использовать встроенные в Spark функции Window вместе с when/otherwise
для ваших конкретных условий без необходимости использования UDF / UDAF. Для простоты размер скользящего окна уменьшен до 4 в следующем примере с фиктивными данными:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val df = (1 to 2).flatMap(i => Seq.tabulate(8)(j => (i, i * 10.0 + j))).
toDF("id", "amount")
val slidingWin = 4
val winSpec = Window.partitionBy($"id").rowsBetween(-(slidingWin - 1), 0)
df.
withColumn("slidingCount", count($"amount").over(winSpec)).
withColumn("slidingAvg", when($"slidingCount" < slidingWin, 0.0).
otherwise(avg($"amount").over(winSpec))
).show
// +---+------+------------+----------+
// | id|amount|slidingCount|slidingAvg|
// +---+------+------------+----------+
// | 1| 10.0| 1| 0.0|
// | 1| 11.0| 2| 0.0|
// | 1| 12.0| 3| 0.0|
// | 1| 13.0| 4| 11.5|
// | 1| 14.0| 4| 12.5|
// | 1| 15.0| 4| 13.5|
// | 1| 16.0| 4| 14.5|
// | 1| 17.0| 4| 15.5|
// | 2| 20.0| 1| 0.0|
// | 2| 21.0| 2| 0.0|
// | 2| 22.0| 3| 0.0|
// | 2| 23.0| 4| 21.5|
// | 2| 24.0| 4| 22.5|
// | 2| 25.0| 4| 23.5|
// | 2| 26.0| 4| 24.5|
// | 2| 27.0| 4| 25.5|
// +---+------+------------+----------+
За замечание в разделе комментариев, я включаю решение через UDF ниже в качестве альтернативы:
def movingAvg(n: Int) = udf{ (ls: Seq[Double]) =>
val (avg, count) = ls.takeRight(n).foldLeft((0.0, 1)){
case ((a, i), next) => (a + (next-a)/i, i + 1)
}
if (count <= n) 0.0 else avg // Expand/Modify this for specific requirement
}
// To apply the UDF:
df.
withColumn("average", movingAvg(slidingWin)(collect_list($"amount").over(winSpec))).
show
Обратите внимание, что в отличие от sum
или count
, collect_list
игнорирует rowsBetween()
и генерирует многораздельные данные, которые потенциально могут быть очень большими для передачи в UDF (отсюда и необходимость takeRight()
). Если вычисленных окон sum
и count
достаточно для того, что требуется для вашего конкретного требования, рассмотрите возможность передачи их в UDF.
В целом, особенно если имеющиеся данные уже в формате DataFrame, они будут работать и масштабироваться лучше, используя встроенный API DataFrame, чтобы воспользоваться преимуществами оптимизации механизма исполнения Spark, чем использование пользовательских UDF / UDAF. Возможно, вам будет интересно прочитать эту статью о преимуществах API DataFrame / Dataset по сравнению с UDF / UDAF.