В конце концов, я проанализировал, как экспоненциальная скользящая средняя реализована в кадрах данных pandas.Помимо рекурсивной формулы, которую я описал выше, и которую сложно реализовать в любой sql или оконной функции (потому что она рекурсивная), есть еще одна, подробно описанная в их системе отслеживания проблем :
y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) /
((1-a)^0 + (1-a)^1 + (1-a)^2 + ... + (1-a)^n).
Учитывая это, и с дополнительной помощью по реализации spark из здесь , я получил следующую реализацию, которая примерно эквивалентна pandas_dataframe.ewm (span = window_size) .mean.() .
def exponentialMovingAverage(partitionColumn: String, orderColumn: String, column: String, windowSize: Int): DataFrame = {
val window = Window.partitionBy(partitionColumn)
val exponentialMovingAveragePrefix = "_EMA_"
val emaUDF = udf((rowNumber: Int, columnPartitionValues: Seq[Double]) => {
val alpha = 2.0 / (windowSize + 1)
val adjustedWeights = (0 until rowNumber + 1).foldLeft(new Array[Double](rowNumber + 1)) { (accumulator, index) =>
accumulator(index) = pow(1 - alpha, rowNumber - index); accumulator
}
(adjustedWeights, columnPartitionValues.slice(0, rowNumber + 1)).zipped.map(_ * _).sum / adjustedWeights.sum
})
dataFrame.withColumn("row_nr", row_number().over(window.orderBy(orderColumn)) - lit(1))
.withColumn(s"$column$exponentialMovingAveragePrefix$windowSize", emaUDF(col("row_nr"), collect_list(column).over(window)))
.drop("row_nr")
}
(Я предполагаю, что тип столбца, для которого мне нужно вычислить экспоненциальное скользящее среднее, равен Double.)
Надеюсь, это поможет другим.