Я предоставляю решение с некоторыми допущениями.
Предполагается, что предыдущая ссылка может быть найдена в максимуме предыдущих 'n' строк. Если 'n' - разумное меньшее значение, я думаю, что это хорошее решение.
Я предположил, что вы можете найти предыдущую ссылку в 5 строках.
def get_distincts(list, current_value):
cnt = {}
flag = False
for i in list:
if current_value == i :
flag = True
break
else:
cnt[i] = "some_value"
if flag:
return len(cnt)
else:
return None
get_distincts_udf = udf(get_distincts, IntegerType())
df = spark.createDataFrame([["a"],["b"],["c"],["c"],["a"],["b"],["a"]]).toDF("col1")
#You can replace this, if you have some unique id column
df = df.withColumn("seq_id", monotonically_increasing_id())
window = Window.orderBy("seq_id")
df = df.withColumn("list", array([lag(col("col1"),i, None).over(window) for i in range(1,6) ]))
df = df.withColumn("col2", get_distincts_udf(col('list'), col('col1'))).drop('seq_id','list')
df.show()
что дает
+----+----+
|col1|col2|
+----+----+
| a|null|
| b|null|
| c|null|
| c| 0|
| a| 2|
| b| 2|
| a| 1|
+----+----+