Рассчитать скользящую сумму массива в PySpark с помощью Window ()? - PullRequest
2 голосов
/ 25 февраля 2020

Я хочу вычислить скользящую сумму столбца ArrayType с учетом отметки времени unix и сгруппировать ее с шагом 2 секунды. Пример ввода / вывода приведен ниже. Я думаю, что функция Window () будет работать, я довольно плохо знаком с PySpark и полностью потерян. Любой вклад с благодарностью!

Ввод:

timestamp     vars 
2             [1,2,1,2]
2             [1,2,1,2]
3             [1,1,1,2]
4             [1,3,4,2]
5             [1,1,1,3]
6             [1,2,3,5]
9             [1,2,3,5]

Ожидаемый вывод:

+---------+-----------------------+
|timestamp|vars                   |
+---------+-----------------------+
|2        |[2.0, 4.0, 2.0, 4.0]   |
|4        |[4.0, 8.0, 7.0, 8.0]   |
|6        |[6.0, 11.0, 11.0, 16.0]|
|10       |[7.0, 13.0, 14.0, 21.0]|
+---------+-----------------------+

Спасибо!

Редактировать: несколько столбцов могут иметь одну и ту же метку времени / могут не иметь быть последовательным Длина переменных также может быть> 3. Ищите немного универсальное c решение, пожалуйста.

Ответы [ 2 ]

2 голосов
/ 25 февраля 2020

Для Spark 2.4+ вы можете использовать функции массива и функции высшего порядка. Это решение будет работать для разных размеров массива (если событие отличается в каждой строке). Вот объясненные шаги:

Сначала сгруппируйте по 2 секундам и соберите vars в столбце массива:

df = df.groupBy((ceil(col("timestamp") / 2) * 2).alias("timestamp")) \
       .agg(collect_list(col("vars")).alias("vars"))

df.show()

#+---------+----------------------+
#|timestamp|vars                  |
#+---------+----------------------+
#|6        |[[1, 1, 1], [1, 2, 3]]|
#|2        |[[1, 1, 1], [1, 2, 1]]|
#|4        |[[1, 1, 1], [1, 3, 4]]|
#+---------+----------------------+

Здесь мы сгруппировали каждые 2 секунды и собрали vars массивы в новый список. Теперь, используя Window spe c, вы можете собирать совокупные значения и использовать flatten, чтобы сгладить подмассивы:

w = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("vars", flatten(collect_list(col("vars")).over(w)))
df.show()

#+---------+------------------------------------------------------------------+
#|timestamp|vars                                                              |
#+---------+------------------------------------------------------------------+
#|2        |[[1, 1, 1], [1, 2, 1]]                                            |
#|4        |[[1, 1, 1], [1, 2, 1], [1, 1, 1], [1, 3, 4]]                      |
#|6        |[[1, 1, 1], [1, 2, 1], [1, 1, 1], [1, 3, 4], [1, 1, 1], [1, 2, 3]]|
#+---------+------------------------------------------------------------------+

Наконец, используйте aggregate функция с zip_with для суммирования массивов:

t = "aggregate(vars, cast(array() as array<double>), (acc, a) -> zip_with(acc, a, (x, y) -> coalesce(x, 0) + coalesce(y, 0)))"

df.withColumn("vars", expr(t)).show(truncate=False)

#+---------+-----------------+
#|timestamp|vars             |
#+---------+-----------------+
#|2        |[2.0, 3.0, 2.0]  |
#|4        |[4.0, 7.0, 7.0]  |
#|6        |[6.0, 10.0, 11.0]|
#+---------+-----------------+

Соединение всех вместе:

from pyspark.sql.functions import ceil, col, collect_list, flatten, expr
from pyspark.sql import Window

w = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)
t = "aggregate(vars, cast(array() as array<double>), (acc, a) -> zip_with(acc, a, (x, y) -> coalesce(x, 0) + coalesce(y, 0)))"

nb_seconds = 2

df.groupBy((ceil(col("timestamp") / nb_seconds) * nb_seconds).alias("timestamp")) \
  .agg(collect_list(col("vars")).alias("vars")) \
  .withColumn("vars", flatten(collect_list(col("vars")).over(w))) \
  .withColumn("vars", expr(t)).show(truncate=False)
1 голос
/ 25 февраля 2020

Использование оконной функции sum для вычисления промежуточной суммы и row_number для выбора каждой второй строки метки времени.

from pyspark.sql import Window
w = Window.orderBy(col('timestamp'))
result = df.withColumn('summed_vars',array([sum(col('vars')[i]).over(w) for i in range(3)])) #change the value 3 as desired
result.filter(col('rnum')%2 == 0).select('timestamp','summed_vars').show()

Измените %2 в соответствии с вашим временным интервалом.

Редактировать: Группировка по временным интервалам с window. Предполагая, что столбец timestamp имеет тип данных timestamp.

from pyspark.sql import Window
from pyspark.sql.functions import window,sum,row_number,array,col 
w = Window.orderBy(col('timestamp'))
result = df.withColumn('timestamp_interval',window(col('timestamp'),'2 second')) \
           .withColumn('summed_vars',array(*[sum(col('vars')[i]).over(w) for i in range(4)])) 
w1 = Window.partitionBy(col('timestamp_interval')).orderBy(col('timestamp').desc())
final_result = result.withColumn('rnum',row_number().over(w1))
final_result.filter(col('rnum')==1).drop(*['rnum','vars']).show()
...