если вы можете быть уверены, что за IN всегда следует событие OUT, вы можете использовать следующий код (я включил проверку IN и OUT, но он не будет работать, если IN и OUT не чередуются).
from pyspark.sql.window import Window as W
test_df = spark.createDataFrame([
(2,datetime.datetime(2018,8,28,12,00), "IN"),(2,datetime.datetime(2018,8,28,12,20), "OUT"),(3,datetime.datetime(2018,8,28,13,00), "IN"),(3,datetime.datetime(2018,8,28,14,20), "OUT"),(3,datetime.datetime(2018,8,28,15,00), "IN"),(3,datetime.datetime(2018,8,28,16,00), "OUT")
], ("USER", "DATETIME", "IN_OUT")) # creation of Dataframe
w = W.partitionBy("USER").orderBy("DATETIME") #order by datetime and process every user separately
get_in= when((lag("IN_OUT", 1).over(w) == "IN") & (col("IN_OUT")=="OUT"), lag("DATETIME",1).over(w)).otherwise(None) # apply the window and if the previous event was IN preserve the time
test_df.withColumn("DATETIMEIN",get_in.cast("timestamp")).withColumn("DATETIMEOUT",col("DATETIME")).filter((col("DATETIMEIN").isNotNull())).withColumn("SESSIONTIME[Minutes]",(col("DATETIME").cast("long")-col("DATETIMEIN").cast("long"))/60).select("USER","DATETIMEIN", "DATETIMEOUT", "SESSIONTIME[Minutes]").show() #apply the function and compute the difference to previous IN_TIME
Результат:
+----+-------------------+-------------------+--------------------+
|USER| DATETIMEIN| DATETIMEOUT|SESSIONTIME[Minutes]|
+----+-------------------+-------------------+--------------------+
| 3|2018-08-28 13:00:00|2018-08-28 14:20:00| 80.0|
| 3|2018-08-28 15:00:00|2018-08-28 16:00:00| 60.0|
| 2|2018-08-28 12:00:00|2018-08-28 12:20:00| 20.0|
+----+-------------------+-------------------+--------------------+