Вам понадобится столбец, который определяет порядок вашего DataFrame.Если он еще не существует, вы можете создать его, используя pyspark.sql.functions.monotonically_increasing_id
.
import pyspark.sql.functions as f
df = df.withColumn("id", f.monotonically_increasing_id())
Далее вы можете использовать технику, описанную в в этом посте для созданиясегменты для каждого набора последовательных дубликатов:
import sys
import pyspark.sql.Window
globalWindow = Window.orderBy("id")
upToThisRowWindow = globalWindow.rowsBetween(-sys.maxsize-1, 0)
df = df.withColumn(
"segment",
f.sum(
f.when(
f.lag("Col-2", 1).over(globalWindow) != f.col("Col-2"),
1
).otherwise(0)
).over(upToThisRowWindow)+1
)
Теперь вы можете группировать по сегментам и агрегировать, используя pyspark.sql.functions.collect_list
для сбора значений в список и pyspark.sql.functions.concat()
для объединения строк:
df = df.groupBy('segment').agg(f.concat(f.collect_list('Col-2'))).drop('segment')