Как подсчитать изменения записей для определенного значения столбца в scala Dataframe - PullRequest
1 голос
/ 08 мая 2020

В кадре данных столбцы имеют ввод, показанный ниже:

    | id|  priority|         status|       datetime|data_as_of_Date|Amount|open_close|
    |  1|Unassigned|          Fixed| 10/8/2019 0:00| 2/12/2020 0:00|    40|    Closed|
    |  1|Unassigned|            New|2/12/2019 11:00| 2/12/2020 0:00|    20|      Open|
    |  1|Unassigned|Fix in progress|9/12/2019 11:00| 2/12/2020 0:00|    90|      Open|
    |  3|  Critical|        Removed|5/17/2019 12:00| 2/12/2020 0:00|    33|    Closed|
    |  3|Unassigned|Fix in progress|5/26/2019 10:00| 2/12/2020 0:00|    30|      Open|
    |  3|  Critical|            New|  5/8/2019 3:00| 2/12/2020 0:00|    34|      Open|
    |  3|Unassigned|          Fixed| 7/29/2019 7:00| 2/12/2020 0:00|    29|    Closed|

Как я могу рассчитать количество изменений столбца open_close для каждой компании?

1 Ответ

1 голос
/ 09 мая 2020

Вы можете использовать оконные функции, чтобы добавить номер строки, используя столбец даты. Затем используйте функцию задержки, чтобы создать новый столбец, который сдвигается на одну позицию вниз, и если значение open_close отличается от предыдущего, ставится «1», в противном случае - «0». Наконец, сгруппируйте по идентификатору компании и суммируйте изменения, отмеченные как 1.

val df2 = df.withColumn("row_num",row_number.over(Window.orderBy('datetime).partitionBy('id)))
val df3 = df2.select('*,lag('open_close, 1, 0).over(Window.orderBy('row_num).partitionBy('id)).as("lag"))
val df4 = df3.select('*,when('open_close === 'lag || 'lag === 0 , 0).otherwise(1).as("change"))
df4.groupBy('id).agg(sum('change)).show()

+---+-----------+
| id|sum(change)|
+---+-----------+
|  1|          1|
|  3|          2|
+---+-----------+
...