Этого можно добиться с помощью окна , например dens_rank . Посмотрите на комментарии ниже:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
cols = ['userid','interaction_timestamp']
data =[( '1' ,'2018-01-02' ),
( '2' , '2018-01-03' ),
( '1' , '2018-01-03' ),
( '1' , '2018-01-04' ),
( '2' , '2018-01-02' ),
( '3' , '2018-01-03' ),
( '4' , '2018-01-03' )]
df = spark.createDataFrame(data, cols)
df = df.withColumn('interaction_timestamp', F.to_date('interaction_timestamp', 'yyyy-MM-dd'))
#rows with the same userid become part of the the same partition
#these partitions will be ordered descending by interaction_timestamp
w = Window.partitionBy('userid').orderBy(F.desc('interaction_timestamp'))
#dense_rank will assign a number to each row according to the defined order
df.withColumn("interaction_date_order", F.dense_rank().over(w)).show()
Вывод:
+------+---------------------+----------------------+
|userid|interaction_timestamp|interaction_date_order|
+------+---------------------+----------------------+
| 3| 2018-01-03| 1|
| 1| 2018-01-04| 1|
| 1| 2018-01-03| 2|
| 1| 2018-01-02| 3|
| 4| 2018-01-03| 1|
| 2| 2018-01-03| 1|
| 2| 2018-01-02| 2|
+------+---------------------+----------------------+