Как я могу сгруппировать (в Scala) строки кадра данных и суммировать значения столбца этих строк? - PullRequest
1 голос
/ 03 июля 2019

У меня есть такой фрейм данных:

ev1    ev2    Score    seconds
A      A       9        0
B      E       1        0
C      C       6        8
D      B       3        10
E      D       5        0
A      E       8        0
C      F       6        0
E      C       3        0
F      B       6        11
D      B       7        0
A      B       9        0
D      G       8        0
G      A       6        9
...    ...     ...      ...

И я хочу сгруппировать строки, пока значения «секунд» не будут между 9 и 11, и я хочу суммировать значение «Оценка» этих строк.

В выводе у меня должно быть что-то вроде этого:

group    sum
   1     19
   2     28
   3     30
   ...    ...

Если первый раздел содержит строки с оценками (9 1 6 3), а «сумма» (19) является суммой этих значений, второй включает (5 8 6 3 6) и т. Д.

1 Ответ

2 голосов
/ 03 июля 2019

Здесь вы можете использовать оконные функции для определения ваших групп.

Чтобы определить, является ли это новая группа, нам нужно проверить, находится ли предыдущее значение seconds между 9 и 11.

// Some useful imports
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.expressions.Window

// Your data with an order defined by monotanically_increasing_id as you are reading it, before any shuffle.
val df = Seq(
("A", "A", 9, 0),
("B", "E", 1, 0),
("C", "C", 6, 8),
("D", "B", 3, 10),
("E", "D", 5, 0),
("A", "E", 8, 0),
("C", "F", 6, 0),
("E", "C", 3, 0),
("F", "B", 6, 11),
("D", "B", 7, 0),
("A", "B", 9, 0),
("D", "G", 8, 0),
("G", "A", 6, 9)
).toDF("ev1", "ev2", "Score", "seconds").withColumn("time_col", F.monotonically_increasing_id)

// Here we are defining the groupId using Window function

val groupIdWindow = Window.orderBy("time_col")

val df2 = df.
withColumn("lagged_seconds", F.lag('seconds, 1, 0) over groupIdWindow).
withColumn("newGroup", ('lagged_seconds > 8 && 'lagged_seconds < 12).cast("bigint")).
withColumn("groupId", sum("newGroup").over(groupIdWindow) + 1)

df2.show
/*

+---+---+-----+-------+--------+--------------+--------+-------+
|ev1|ev2|Score|seconds|time_col|lagged_seconds|newGroup|groupId|
+---+---+-----+-------+--------+--------------+--------+-------+
|  A|  A|    9|      0|       0|             0|       0|      1|
|  B|  E|    1|      0|       1|             0|       0|      1|
|  C|  C|    6|      8|       2|             0|       0|      1|
|  D|  B|    3|     10|       3|             8|       0|      1|
|  E|  D|    5|      0|       4|            10|       1|      2|
|  A|  E|    8|      0|       5|             0|       0|      2|
|  C|  F|    6|      0|       6|             0|       0|      2|
|  E|  C|    3|      0|       7|             0|       0|      2|
|  F|  B|    6|     11|       8|             0|       0|      2|
|  D|  B|    7|      0|       9|            11|       1|      3|
|  A|  B|    9|      0|      10|             0|       0|      3|
|  D|  G|    8|      0|      11|             0|       0|      3|
|  G|  A|    6|      9|      12|             0|       0|      3|
+---+---+-----+-------+--------+--------------+--------+-------+

*/

// And now, a simple groupBy

df2.groupBy("groupId").agg(F.sum("Score").as("Score")).show
/*
-------+-----+
|groupId|Score|
+-------+-----+
|      1|   19|
|      2|   28|
|      3|   30|
+-------+-----+
*/
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...