pyspark: скользящее среднее с использованием пользовательских временных рядов - PullRequest
0 голосов
/ 21 января 2020

Привет Мой базовый фрейм данных выглядит следующим образом.

'|stockId|timeStamp|stockPrice|'
+-------+---------+----------+
|    101|        1|      53.0|
|    101|        2|      15.0|
|    101|        3|      57.0|
|    101|        4|      71.0|
|    101|        5|      86.0|

Это мой код, который конвертирует дни. Далее следуют окно и среднее окно.

days=lambda i:i*86400
W=Window.partitionBy(F.col('stockId')).orderBy(F.col('epoch_time').cast("timestamp").cast("long")).rangeBetween(-days(3),0)
Df=.withColumn("current_timestamp",F.unix_timestamp(F.lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))\
.withColumn("epoch",F.unix_timestamp("current_timestamp"))\
.withColumn("epoch_time",F.concat(F.col("epoch")+F.col("timeStamp")))\
.withColumn("moving_avg",F.avg("stockPrice").over(W))

Это мой результат .

+-------+---------+----------+-------------------+----------+----------+-----------------+
|stockId|timeStamp|stockPrice|  current_timestamp|     epoch|epoch_time|       moving_avg|
+-------+---------+----------+-------------------+----------+----------+-----------------+
|    101|        1|      53.0|2020-01-21 10:53:43|1579584223|1579584224|48.21782178217822|
|    101|        2|      15.0|2020-01-21 10:53:43|1579584223|1579584225|48.21782178217822|
|    101|        3|      57.0|2020-01-21 10:53:43|1579584223|1579584226|48.21782178217822|
|    101|        4|      86.0|2020-01-21 10:53:43|1579584223|1579584227|48.21782178217822|

Ожидаемый результат

+-------+---------+----------+-------------------+----------+----------+-----------------+
|stockId|timeStamp|stockPrice|  current_timestamp|     epoch|epoch_time|       moving_avg|
+-------+---------+----------+-------------------+----------+----------+-----------------+
|    101|        3|      57.0|2020-01-21 10:53:43|1579584223|1579584226|41.67|
|    101|        4|      71.0|2020-01-21 10:53:43|1579584223|1579584227|47.67|
|    101|        5|      71.0|2020-01-21 10:53:43|1579584223|1579584227|71.33|

1 Ответ

0 голосов
/ 29 января 2020
W=Window.partitionBy(F.col('stockId')).orderBy(F.col('epoch_time').cast("timestamp").cast("long")).rangeBetween(-2,0)

+-------+---------+----------+-------------------+----------+----------+------
|stockId|timeStamp|stockPrice|  current_timestamp|     epoch|epoch_time|       moving_avg|
+-------+---------+----------+-------------------+----------+----------+-----------------+
|    101|        3|      57.0|2020-01-21 10:53:43|1579584223|1579584226|41.67|
|    101|        4|      71.0|2020-01-21 10:53:43|1579584223|1579584227|47.67|
|    101|        5|      71.0|2020-01-21 10:53:43|1579584223|1579584227|71.33|

После продолжения прокатки window rangeBetween(-2,0), где среднее значение перешло от current row до 3rd row of the table.

...