Вот один из подходов, который вы можете использовать.
Сначала создайте столбец, чтобы определить, изменился ли status
для данного key
:
from pyspark.sql.functions import col, lag
from pyspark.sql import Window
w = Window.partitionBy("key").orderBy("time")
df = df.withColumn(
"status_change",
(col("status") != lag("status").over(w)).cast("int")
)
df.show()
#+---+-----+----+-------+-------------+
#|key|value|time| status|status_change|
#+---+-----+----+-------+-------------+
#| x| 10| 0|running| null|
#| x| 15| 1|running| 0|
#| x| 30| 2|running| 0|
#| x| 15| 3|running| 0|
#| x| 0| 4| stop| 1|
#| x| 40| 5|running| 1|
#| x| 10| 6|running| 0|
#| y| 10| 0|running| null|
#| y| 15| 1|running| 0|
#| y| 30| 2|running| 0|
#| y| 15| 3|running| 0|
#| y| 0| 4| stop| 1|
#| y| 40| 5|running| 1|
#| y| 10| 6|running| 0|
#+---+-----+----+-------+-------------+
Затем заполните null
с 0
и возьмите совокупную сумму столбца status_change
, за key
:
from pyspark.sql.functions import sum as sum_ # avoid shadowing builtin
df = df.fillna(0).withColumn(
"status_group",
sum_("status_change").over(w)
)
df.show()
#+---+-----+----+-------+-------------+------------+
#|key|value|time| status|status_change|status_group|
#+---+-----+----+-------+-------------+------------+
#| x| 10| 0|running| 0| 0|
#| x| 15| 1|running| 0| 0|
#| x| 30| 2|running| 0| 0|
#| x| 15| 3|running| 0| 0|
#| x| 0| 4| stop| 1| 1|
#| x| 40| 5|running| 1| 2|
#| x| 10| 6|running| 0| 2|
#| y| 10| 0|running| 0| 0|
#| y| 15| 1|running| 0| 0|
#| y| 30| 2|running| 0| 0|
#| y| 15| 3|running| 0| 0|
#| y| 0| 4| stop| 1| 1|
#| y| 40| 5|running| 1| 2|
#| y| 10| 6|running| 0| 2|
#+---+-----+----+-------+-------------+------------+
Теперь вы можете агрегировать по key
и status_group
. Вы также можете включить status
в groupBy
, поскольку он будет одинаковым для каждого status_group
. В конце выберите только те столбцы, которые вы хотите использовать в выводе.
from pyspark.sql.functions import min as min_, max as max_
df_agg = df.groupBy("key", "status", "status_group")\
.agg(
min_("time").alias("start"),
max_("time").alias("end"),
max_("value").alias("max_value")
)\
.select("key", "start", "end", "status", "max_value")\
.sort("key", "start")
df_agg.show()
#+---+-----+---+-------+---------+
#|key|start|end| status|max_value|
#+---+-----+---+-------+---------+
#| x| 0| 3|running| 30|
#| x| 4| 4| stop| 0|
#| x| 5| 6|running| 40|
#| y| 0| 3|running| 30|
#| y| 4| 4| stop| 0|
#| y| 5| 6|running| 40|
#+---+-----+---+-------+---------+