Для Spark 2.4+ вы можете использовать функции массива и функции высшего порядка. Это решение будет работать для разных размеров массива (если событие отличается в каждой строке). Вот объясненные шаги:
Сначала сгруппируйте по 2 секундам и соберите vars
в столбце массива:
df = df.groupBy((ceil(col("timestamp") / 2) * 2).alias("timestamp")) \
.agg(collect_list(col("vars")).alias("vars"))
df.show()
#+---------+----------------------+
#|timestamp|vars |
#+---------+----------------------+
#|6 |[[1, 1, 1], [1, 2, 3]]|
#|2 |[[1, 1, 1], [1, 2, 1]]|
#|4 |[[1, 1, 1], [1, 3, 4]]|
#+---------+----------------------+
Здесь мы сгруппировали каждые 2 секунды и собрали vars
массивы в новый список. Теперь, используя Window spe c, вы можете собирать совокупные значения и использовать flatten
, чтобы сгладить подмассивы:
w = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("vars", flatten(collect_list(col("vars")).over(w)))
df.show()
#+---------+------------------------------------------------------------------+
#|timestamp|vars |
#+---------+------------------------------------------------------------------+
#|2 |[[1, 1, 1], [1, 2, 1]] |
#|4 |[[1, 1, 1], [1, 2, 1], [1, 1, 1], [1, 3, 4]] |
#|6 |[[1, 1, 1], [1, 2, 1], [1, 1, 1], [1, 3, 4], [1, 1, 1], [1, 2, 3]]|
#+---------+------------------------------------------------------------------+
Наконец, используйте aggregate
функция с zip_with
для суммирования массивов:
t = "aggregate(vars, cast(array() as array<double>), (acc, a) -> zip_with(acc, a, (x, y) -> coalesce(x, 0) + coalesce(y, 0)))"
df.withColumn("vars", expr(t)).show(truncate=False)
#+---------+-----------------+
#|timestamp|vars |
#+---------+-----------------+
#|2 |[2.0, 3.0, 2.0] |
#|4 |[4.0, 7.0, 7.0] |
#|6 |[6.0, 10.0, 11.0]|
#+---------+-----------------+
Соединение всех вместе:
from pyspark.sql.functions import ceil, col, collect_list, flatten, expr
from pyspark.sql import Window
w = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)
t = "aggregate(vars, cast(array() as array<double>), (acc, a) -> zip_with(acc, a, (x, y) -> coalesce(x, 0) + coalesce(y, 0)))"
nb_seconds = 2
df.groupBy((ceil(col("timestamp") / nb_seconds) * nb_seconds).alias("timestamp")) \
.agg(collect_list(col("vars")).alias("vars")) \
.withColumn("vars", flatten(collect_list(col("vars")).over(w))) \
.withColumn("vars", expr(t)).show(truncate=False)