Как мы можем сгенерировать уникальный идентификатор сеанса для данных потока кликов, используя кадры данных Spark (Scala) со следующими двумя условиями?
- Сеанс истекает через 30 минут бездействия (означает отсутствие данных потока кликов в течение 30 минут)
- Сессия остается активной в течение 2 часов. Через 2 часа возобновите сеанс.
Введите:
UserId | Click Time
-----------------------------
U1 | 2019-01-01T11:00:00Z
U1 | 2019-01-01T11:15:00Z
U1 | 2019-01-01T12:00:00Z
U1 | 2019-01-01T12:20:00Z
U1 | 2019-01-01T15:00:00Z
U2 | 2019-01-01T11:00:00Z
U2 | 2019-01-02T11:00:00Z
U2 | 2019-01-02T11:25:00Z
U2 | 2019-01-02T11:50:00Z
U2 | 2019-01-02T12:15:00Z
U2 | 2019-01-02T12:40:00Z
U2 | 2019-01-02T13:05:00Z
U2 | 2019-01-02T13:20:00Z
Ожидаемый результат
UserId | Click Time | SessionId
-----------------------------------------
U1 | 2019-01-01T11:00:00Z | Session1
U1 | 2019-01-01T11:15:00Z | Session1
U1 | 2019-01-01T12:00:00Z | Session2
U1 | 2019-01-01T12:20:00Z | Session2
U1 | 2019-01-01T15:00:00Z | Session3
U2 | 2019-01-01T11:00:00Z | Session4
U2 | 2019-01-02T11:00:00Z | Session5
U2 | 2019-01-02T11:25:00Z | Session5
U2 | 2019-01-02T11:50:00Z | Session5
U2 | 2019-01-02T12:15:00Z | Session5
U2 | 2019-01-02T12:40:00Z | Session5
U2 | 2019-01-02T13:05:00Z | Session6
U2 | 2019-01-02T13:20:00Z | Session6
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val df = sc.parallelize(List(("U1","2019-01-01T11:00:00Z"),("U1","2019-01-01T11:15:00Z"),("U1","2019-01-01T12:00:00Z"),("U1","2019-01-01T12:20:00Z"),("U1","2019-01-01T15:00:00Z"),("U2","2019-01-01T11:00:00Z"),("U2","2019-01-02T11:00:00Z"),("U2","2019-01-02T11:25:00Z"),("U2","2019-01-02T11:50:00Z"),("U2","2019-01-02T12:15:00Z"),("U2","2019-01-02T12:40:00Z"),("U2","2019-01-02T13:05:00Z"),("U2","2019-01-02T13:20:00Z"))).toDF("UserId","time").withColumn("ClickTime", col("time").cast("timestamp")).drop(col("time"))
val windowSpec = Window.partitionBy("userid").orderBy("clicktime")
val lagWindow = lag(col("clicktime"), 1).over(windowSpec)
val df1 = df.select(col("userid"), col("clicktime"), lagWindow.alias("prevclicktime")).withColumn("timediff", (col("clicktime").cast("long") - col("prevclicktime").cast("long"))).na.fill(Map("timediff" -> 0)).drop(col("prevclicktime"))
df1.show(truncate = false)
Я не могу применить второе условие, так как нам нужно накопить продолжительность сеанса и проверить, что накопленное значение составляет менее 2 часов. Если накопленная продолжительность больше 2 часов, необходимо назначить новый сеанс.
Пожалуйста, помогите мне в этом.
Заранее спасибо ...!