Вы можете использовать оконные функции, чтобы добавить номер строки, используя столбец даты. Затем используйте функцию задержки, чтобы создать новый столбец, который сдвигается на одну позицию вниз, и если значение open_close отличается от предыдущего, ставится «1», в противном случае - «0». Наконец, сгруппируйте по идентификатору компании и суммируйте изменения, отмеченные как 1.
val df2 = df.withColumn("row_num",row_number.over(Window.orderBy('datetime).partitionBy('id)))
val df3 = df2.select('*,lag('open_close, 1, 0).over(Window.orderBy('row_num).partitionBy('id)).as("lag"))
val df4 = df3.select('*,when('open_close === 'lag || 'lag === 0 , 0).otherwise(1).as("change"))
df4.groupBy('id).agg(sum('change)).show()
+---+-----------+
| id|sum(change)|
+---+-----------+
| 1| 1|
| 3| 2|
+---+-----------+