Искра Экспоненциальная скользящая средняя - PullRequest
0 голосов
/ 05 июня 2018

У меня есть данные о ценах временных рядов, с идентификатором, датой и ценой.

Мне нужно вычислить экспоненциальную скользящую среднюю для столбца цены и добавить его в качестве нового столбца в фрейм данных.

Ранее я использовал оконные функции Spark, и это выглядело какподходит для этого варианта использования, но учитывая формулу для EMA:

EMA: {Price - EMA(previous day)} x multiplier + EMA(previous day)

, где

multiplier = (2 / (Time periods + 1)) //let's assume Time period is 10 days for now

, я немного запутался в том, как я могу получить доступ к предыдущему вычисленному значению встолбец, в то время как на самом деле окно над столбцом.С помощью простого скользящего среднего все просто, поскольку все, что вам нужно сделать, это вычислить новый столбец при усреднении элементов в окне:

var window = Window.partitionBy("ID").orderBy("Date").rowsBetween(-windowSize, Window.currentRow)
dataFrame.withColumn(avg(col("Price")).over(window).alias("SMA"))

Но кажется, что с EMA это немного сложнее, так как прикаждый шаг мне нужно предыдущее вычисленное значение.

Я также смотрел на Взвешенное скользящее среднее в Pyspark , но мне нужен подход для Spark / Scala и для EMA на 10 или 30 дней.

Есть идеи?

1 Ответ

0 голосов
/ 02 февраля 2019

В конце концов, я проанализировал, как экспоненциальная скользящая средняя реализована в кадрах данных pandas.Помимо рекурсивной формулы, которую я описал выше, и которую сложно реализовать в любой sql или оконной функции (потому что она рекурсивная), есть еще одна, подробно описанная в их системе отслеживания проблем :

y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) /
       ((1-a)^0 + (1-a)^1 + (1-a)^2 + ... + (1-a)^n).

Учитывая это, и с дополнительной помощью по реализации spark из здесь , я получил следующую реализацию, которая примерно эквивалентна pandas_dataframe.ewm (span = window_size) .mean.() .

def exponentialMovingAverage(partitionColumn: String, orderColumn: String, column: String, windowSize: Int): DataFrame = {
  val window = Window.partitionBy(partitionColumn)
  val exponentialMovingAveragePrefix = "_EMA_"

  val emaUDF = udf((rowNumber: Int, columnPartitionValues: Seq[Double]) => {
    val alpha = 2.0 / (windowSize + 1)
    val adjustedWeights = (0 until rowNumber + 1).foldLeft(new Array[Double](rowNumber + 1)) { (accumulator, index) =>
      accumulator(index) = pow(1 - alpha, rowNumber - index); accumulator
    }
    (adjustedWeights, columnPartitionValues.slice(0, rowNumber + 1)).zipped.map(_ * _).sum / adjustedWeights.sum
  })
  dataFrame.withColumn("row_nr", row_number().over(window.orderBy(orderColumn)) - lit(1))
    .withColumn(s"$column$exponentialMovingAveragePrefix$windowSize", emaUDF(col("row_nr"), collect_list(column).over(window)))
    .drop("row_nr")
}

(Я предполагаю, что тип столбца, для которого мне нужно вычислить экспоненциальное скользящее среднее, равен Double.)

Надеюсь, это поможет другим.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...