Spark Dataframe: группировать и ранжировать строки по определенному значению столбца - PullRequest
0 голосов
/ 14 февраля 2020

Я пытаюсь ранжировать столбец, когда нумерация столбцов «ID» начинается с 1 до максимума, а затем сбрасывается с 1.

Итак, первые три строки имеют непрерывную нумерацию на «ID»; следовательно, они должны быть сгруппированы с группой rank = 1. Строки четыре и пять находятся в другой группе, ранг группы = 2.

Строки отсортированы по столбцу «rownum». Мне известна оконная функция row_number, но я не думаю, что могу подать заявку на этот вариант использования, так как нет постоянного окна. Я могу думать только о циклическом просмотре каждой строки в кадре данных, но не уверен, как мне обновить столбец, когда число сбрасывается до 1.

val df = Seq ((1, 1), (2, 2), (3, 3), (4, 1), (5, 2), (6, 1), (7, 1), (8, 2)) .toDF («rownum», «ID») df.show ()

enter image description here

Ожидаемый результат ниже: enter image description here

Ответы [ 2 ]

3 голосов
/ 14 февраля 2020

Вы можете сделать это с помощью двух оконных функций: первая для обозначения состояния, вторая для вычисления промежуточной суммы:

df
  .withColumn("increase", $"ID" > lag($"ID",1).over(Window.orderBy($"rownum")))
  .withColumn("group_rank_of_ID",sum(when($"increase",lit(0)).otherwise(lit(1))).over(Window.orderBy($"rownum")))
  .drop($"increase")
  .show()

дает:

+------+---+----------------+
|rownum| ID|group_rank_of_ID|
+------+---+----------------+
|     1|  1|               1|
|     2|  2|               1|
|     3|  3|               1|
|     4|  1|               2|
|     5|  2|               2|
|     6|  1|               3|
|     7|  1|               4|
|     8|  2|               4|
+------+---+----------------+
2 голосов
/ 14 февраля 2020

Как заметил @Prithvi, мы можем использовать lead здесь.

Хитрая часть, чтобы использовать оконную функцию, такую ​​как lead, нам нужно по крайней мере предоставить заказ.

Рассмотрим


val nextID = lag('ID, 1, -1) over Window.orderBy('rownum)
val isNewGroup = 'ID <= nextID cast "integer"
val group_rank_of_ID = sum(isNewGroup) over Window.orderBy('rownum)

/* you can try 
df.withColumn("intermediate", nextID).show
//                           ^^^^^^^-- can be `isNewGroup`, or other vals
*/



df.withColumn("group_rank_of_ID", group_rank_of_ID).show

/* returns
+------+---+----------------+
|rownum| ID|group_rank_of_ID|
+------+---+----------------+
|     1|  1|               0|
|     2|  2|               0|
|     3|  3|               0|
|     4|  1|               1|
|     5|  2|               1|
|     6|  1|               2|
|     7|  1|               3|
|     8|  2|               3|
+------+---+----------------+
*/


df.withColumn("group_rank_of_ID", group_rank_of_ID + 1).show

/* returns
+------+---+----------------+
|rownum| ID|group_rank_of_ID|
+------+---+----------------+
|     1|  1|               1|
|     2|  2|               1|
|     3|  3|               1|
|     4|  1|               2|
|     5|  2|               2|
|     6|  1|               3|
|     7|  1|               4|
|     8|  2|               4|
+------+---+----------------+
*/
...