Здесь вы можете использовать оконные функции для определения ваших групп.
Чтобы определить, является ли это новая группа, нам нужно проверить, находится ли предыдущее значение seconds
между 9 и 11.
// Some useful imports
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.expressions.Window
// Your data with an order defined by monotanically_increasing_id as you are reading it, before any shuffle.
val df = Seq(
("A", "A", 9, 0),
("B", "E", 1, 0),
("C", "C", 6, 8),
("D", "B", 3, 10),
("E", "D", 5, 0),
("A", "E", 8, 0),
("C", "F", 6, 0),
("E", "C", 3, 0),
("F", "B", 6, 11),
("D", "B", 7, 0),
("A", "B", 9, 0),
("D", "G", 8, 0),
("G", "A", 6, 9)
).toDF("ev1", "ev2", "Score", "seconds").withColumn("time_col", F.monotonically_increasing_id)
// Here we are defining the groupId using Window function
val groupIdWindow = Window.orderBy("time_col")
val df2 = df.
withColumn("lagged_seconds", F.lag('seconds, 1, 0) over groupIdWindow).
withColumn("newGroup", ('lagged_seconds > 8 && 'lagged_seconds < 12).cast("bigint")).
withColumn("groupId", sum("newGroup").over(groupIdWindow) + 1)
df2.show
/*
+---+---+-----+-------+--------+--------------+--------+-------+
|ev1|ev2|Score|seconds|time_col|lagged_seconds|newGroup|groupId|
+---+---+-----+-------+--------+--------------+--------+-------+
| A| A| 9| 0| 0| 0| 0| 1|
| B| E| 1| 0| 1| 0| 0| 1|
| C| C| 6| 8| 2| 0| 0| 1|
| D| B| 3| 10| 3| 8| 0| 1|
| E| D| 5| 0| 4| 10| 1| 2|
| A| E| 8| 0| 5| 0| 0| 2|
| C| F| 6| 0| 6| 0| 0| 2|
| E| C| 3| 0| 7| 0| 0| 2|
| F| B| 6| 11| 8| 0| 0| 2|
| D| B| 7| 0| 9| 11| 1| 3|
| A| B| 9| 0| 10| 0| 0| 3|
| D| G| 8| 0| 11| 0| 0| 3|
| G| A| 6| 9| 12| 0| 0| 3|
+---+---+-----+-------+--------+--------------+--------+-------+
*/
// And now, a simple groupBy
df2.groupBy("groupId").agg(F.sum("Score").as("Score")).show
/*
-------+-----+
|groupId|Score|
+-------+-----+
| 1| 19|
| 2| 28|
| 3| 30|
+-------+-----+
*/