В качестве оптимизации ответа cronoik, вы можете использовать тот факт, что у вас уже есть глобальный уникальный столбец 'idx' в вашем наборе данных, и установить ваши 'trip_id' и 'subtrip_id' в качестве idx строки, в которой отключение или подстрока начинается.
Это позволит вам сгенерировать trip_id и subtrip_id за один проход с монотонно увеличивающимися целыми числами. Поскольку вам нужно, чтобы subtrip_id начинался с 1 и увеличивался с шагом 1, у вас будет второй проход окна:
import sys
import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame([
(1, 110, 'aaa', 'walk', 'work'),
(2, 110, 'aaa', 'walk', 'work'),
(3, 110, 'aaa', 'bus', 'work'),
(4, 110, 'aaa', 'bus', 'work'),
(5, 110, 'aaa', 'walk','work'),
(6, 110, 'bbb', 'walk', 'home'),
(7, 110, 'bbb', 'bus', 'home'),
(8, 110, 'bbb', 'bus', 'home'),
(9, 110, 'bbb', 'walk', 'home')
],
['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']
)
tripW = Window.partitionBy('u_uuid', 'p_uuid', 'dest').orderBy('idx')
df_final = df\
.withColumn("trip_id", F.first('idx').over(tripW))\
.withColumn('lagged_mode', F.lag('mode', default='').over(tripW))\
.withColumn('subtrip_id', (F.col('mode') != F.col('lagged_mode')).cast('int'))\
.withColumn('subtrip_id',F.sum('subtrip_id').over(tripW))\
.drop('lagged_mode')\
.sort('idx')
df_final.show()
+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
| 1| 110| aaa|walk|work| 1| 1|
| 2| 110| aaa|walk|work| 1| 1|
| 3| 110| aaa| bus|work| 1| 2|
| 4| 110| aaa| bus|work| 1| 2|
| 5| 110| aaa|walk|work| 1| 3|
| 6| 110| bbb|walk|home| 6| 1|
| 7| 110| bbb| bus|home| 6| 2|
| 8| 110| bbb| bus|home| 6| 2|
| 9| 110| bbb|walk|home| 6| 3|
+---+------+------+----+----+-------+----------+
Если вы посмотрите на сгенерированный план, Spark может построить его довольно эффективно:
df_final.explain()
TakeOrderedAndProject(limit=21, orderBy=[idx#2861L ASC NULLS FIRST], output=[idx#12637,u_uuid#12638,p_uuid#2863,mode#2864,dest#2865,trip_id#12642,subtrip_id#12643])
+- *(3) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#12589L, subtrip_id#12614L]
+- Window [sum(cast(subtrip_id#12604 as bigint)) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS subtrip_id#12614L], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
+- *(2) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#12589L, cast(NOT (mode#2864 = lagged_mode#12596) as int) AS subtrip_id#12604]
+- Window [first(idx#2861L, false) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS trip_id#12589L, lag(mode#2864, 1, ) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS lagged_mode#12596], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
+- *(1) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(u_uuid#2862L, p_uuid#2863, dest#2865, 200)
+- Scan ExistingRDD[idx#2861L,u_uuid#2862L,p_uuid#2863,mode#2864,dest#2865]
Решение cronoik использует другие критерии сортировки или окна и, таким образом, потребует более дорогих шагов сортировки и более длительного времени выполнения:
df_subtrip.sort('idx').explain()
TakeOrderedAndProject(limit=21, orderBy=[idx#2861L ASC NULLS FIRST], output=[idx#2948,u_uuid#2949,p_uuid#2863,mode#2864,dest#2865,trip_id#2953,subtrip_id#2954])
+- *(4) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, subtrip_id#2933]
+- Window [last(subtrip_id#2923, true) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS subtrip_id#2933], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
+- *(3) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
+- *(3) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, CASE WHEN (subtrip_id#2913 = 1) THEN _we0#2924 ELSE null END AS subtrip_id#2923]
+- Window [row_number() windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, subtrip_id#2913, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#2924], [u_uuid#2862L, p_uuid#2863, dest#2865, subtrip_id#2913], [idx#2861L ASC NULLS FIRST]
+- *(2) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, subtrip_id#2913 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
+- *(2) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, CASE WHEN NOT (mode#2864 = _we0#2914) THEN 1 ELSE null END AS subtrip_id#2913]
+- Window [lag(mode#2864, 1, SOMETHING) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#2914], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
+- *(1) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
+- *(1) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, SPARK_PARTITION_ID() AS trip_id#2887]
+- Exchange hashpartitioning(u_uuid#2862L, p_uuid#2863, dest#2865, 200)
+- Scan ExistingRDD[idx#2861L,u_uuid#2862L,p_uuid#2863,mode#2864,dest#2865]