Я прочитал несколько фреймов данных из тем Кафки, используя Pyspark Структурированный Поток 2.4.4. Я хотел бы добавить несколько новых столбцов к этим фреймам данных, которые в основном основаны на расчетах окон по прошлым N точкам данных (например: скользящее среднее по последним 20 точкам данных), и по мере доставки новой точки данных соответствующее значение MA_20 должен быть мгновенно рассчитан.
Данные могут выглядеть так: Отметка времени | VIX
2020-01-22 10:20:32 | 13.05
2020-01-22 10:25:31 | 14.35
2020-01-23 09:00:20 | 14.12
Стоит отметить, что данные будут приниматься с понедельника по пятницу в течение 8 часов в день. Таким образом, скользящее среднее значение, рассчитанное в понедельник утром, должно включать в себя данные с пятницы!
Я пробовал разные подходы, но все еще не могу достичь того, чего хочу.
windows = df_vix \
.withWatermark("Timestamp", "100 minutes") \
.groupBy(F.window("Timestamp", "100 minute", "5 minute")) \
aggregatedDF = windows.agg(F.avg("VIX"))
Предыдущий код рассчитан MA, но данные с пятницы будут рассмотрены с опозданием, поэтому они будут исключены. лучше, чем последние 100 минут, должны быть последние 20 точек (с 5-минутными интервалами).
Я думал, что могу использовать rowBetween или rangeBetween, но в кадрах потоковых данных окно нельзя применять ie к столбцам без отметки времени (F.col ('Timestamp'). Cast ('long'))
w = Window.orderBy(F.col('Timestamp').cast('long')).rowsBetween(-600, 0)
df = df_vix.withColumn('MA_20', F.avg('VIX').over(w)
)
Но, с другой стороны, нет возможности указать интервал в rowBetween (), используя rowBetween (- minutes (20) ), 0) throws: минуты не определены (в sql .functions такой функции нет)
Я нашел другой способ, но он также не работает для потоковых фреймов данных. Не знаю, почему «не основанные на времени windows не поддерживаются при потоковой передаче данных» возникает ошибка (df_vix.Timestamp имеет тип отметки времени)
df.createOrReplaceTempView("df_vix")
df_vix.createOrReplaceTempView("df_vix")
aggregatedDF = spark.sql(
"""SELECT *, mean(VIX) OVER (
ORDER BY CAST(df_vix.Timestamp AS timestamp)
RANGE BETWEEN INTERVAL 100 MINUTES PRECEDING AND CURRENT ROW
) AS mean FROM df_vix""")
Я понятия не имею, что еще я мог использовать для расчета простого скользящего среднего. Похоже, что невозможно достичь этого в Pyspark ... возможно, лучшим решением будет преобразовывать каждый раз, когда новые данные передают весь фрейм данных Spark в Pandas и вычислять все в Pandas (или добавлять новые строки в pandas и вычислите MA) ???
Я думал, что создание новых функций по мере поступления новых данных является основной целью структурированного потокового вещания, но, как оказалось, Pyspark не подходит для этого, я собираюсь отказаться Pyspark ход к Pandas ...
EDIT
Следующее не работает, хотя df_vix.Timestamp типа: 'timestamp', но выдает 'Non-time windows на основе не поддерживаются при ошибке потоковой передачи данных.
w = Window.orderBy(df_vix.Timestamp).rowsBetween(-20, -1)
aggregatedDF = df_vix.withColumn("MA", F.avg("VIX").over(w))