Окно структурированной потоковой передачи Pyspark (скользящее среднее) по последним N точкам данных - PullRequest
0 голосов
/ 23 января 2020

Я прочитал несколько фреймов данных из тем Кафки, используя Pyspark Структурированный Поток 2.4.4. Я хотел бы добавить несколько новых столбцов к этим фреймам данных, которые в основном основаны на расчетах окон по прошлым N точкам данных (например: скользящее среднее по последним 20 точкам данных), и по мере доставки новой точки данных соответствующее значение MA_20 должен быть мгновенно рассчитан.

Данные могут выглядеть так: Отметка времени | VIX

2020-01-22 10:20:32 | 13.05
2020-01-22 10:25:31 | 14.35
2020-01-23 09:00:20 | 14.12

Стоит отметить, что данные будут приниматься с понедельника по пятницу в течение 8 часов в день. Таким образом, скользящее среднее значение, рассчитанное в понедельник утром, должно включать в себя данные с пятницы!

Я пробовал разные подходы, но все еще не могу достичь того, чего хочу.

windows = df_vix \
    .withWatermark("Timestamp", "100 minutes") \
    .groupBy(F.window("Timestamp", "100 minute", "5 minute")) \

aggregatedDF = windows.agg(F.avg("VIX"))

Предыдущий код рассчитан MA, но данные с пятницы будут рассмотрены с опозданием, поэтому они будут исключены. лучше, чем последние 100 минут, должны быть последние 20 точек (с 5-минутными интервалами).

Я думал, что могу использовать rowBetween или rangeBetween, но в кадрах потоковых данных окно нельзя применять ie к столбцам без отметки времени (F.col ('Timestamp'). Cast ('long'))

    w = Window.orderBy(F.col('Timestamp').cast('long')).rowsBetween(-600, 0)

    df = df_vix.withColumn('MA_20', F.avg('VIX').over(w)

)

Но, с другой стороны, нет возможности указать интервал в rowBetween (), используя rowBetween (- minutes (20) ), 0) throws: минуты не определены (в sql .functions такой функции нет)

Я нашел другой способ, но он также не работает для потоковых фреймов данных. Не знаю, почему «не основанные на времени windows не поддерживаются при потоковой передаче данных» возникает ошибка (df_vix.Timestamp имеет тип отметки времени)

df.createOrReplaceTempView("df_vix")

df_vix.createOrReplaceTempView("df_vix")
aggregatedDF = spark.sql(
    """SELECT *, mean(VIX) OVER (
        ORDER BY CAST(df_vix.Timestamp AS timestamp)
        RANGE BETWEEN INTERVAL 100 MINUTES PRECEDING AND CURRENT ROW
     ) AS mean FROM df_vix""")

Я понятия не имею, что еще я мог использовать для расчета простого скользящего среднего. Похоже, что невозможно достичь этого в Pyspark ... возможно, лучшим решением будет преобразовывать каждый раз, когда новые данные передают весь фрейм данных Spark в Pandas и вычислять все в Pandas (или добавлять новые строки в pandas и вычислите MA) ???

Я думал, что создание новых функций по мере поступления новых данных является основной целью структурированного потокового вещания, но, как оказалось, Pyspark не подходит для этого, я собираюсь отказаться Pyspark ход к Pandas ...

EDIT

Следующее не работает, хотя df_vix.Timestamp типа: 'timestamp', но выдает 'Non-time windows на основе не поддерживаются при ошибке потоковой передачи данных.

w = Window.orderBy(df_vix.Timestamp).rowsBetween(-20, -1)
aggregatedDF = df_vix.withColumn("MA", F.avg("VIX").over(w))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...