Создайте идентификатор сеанса для данных потока кликов на основе условий в Apache Spark - PullRequest
1 голос
/ 24 мая 2019

Как мы можем сгенерировать уникальный идентификатор сеанса для данных потока кликов, используя кадры данных Spark (Scala) со следующими двумя условиями?

  1. Сеанс истекает через 30 минут бездействия (означает отсутствие данных потока кликов в течение 30 минут)
  2. Сессия остается активной в течение 2 часов. Через 2 часа возобновите сеанс.

Введите:

UserId |    Click Time
-----------------------------
  U1   | 2019-01-01T11:00:00Z
  U1   | 2019-01-01T11:15:00Z
  U1   | 2019-01-01T12:00:00Z
  U1   | 2019-01-01T12:20:00Z
  U1   | 2019-01-01T15:00:00Z
  U2   | 2019-01-01T11:00:00Z
  U2   | 2019-01-02T11:00:00Z
  U2   | 2019-01-02T11:25:00Z
  U2   | 2019-01-02T11:50:00Z
  U2   | 2019-01-02T12:15:00Z
  U2   | 2019-01-02T12:40:00Z
  U2   | 2019-01-02T13:05:00Z
  U2   | 2019-01-02T13:20:00Z

Ожидаемый результат

UserId |    Click Time        | SessionId
-----------------------------------------
  U1   | 2019-01-01T11:00:00Z | Session1
  U1   | 2019-01-01T11:15:00Z | Session1
  U1   | 2019-01-01T12:00:00Z | Session2
  U1   | 2019-01-01T12:20:00Z | Session2
  U1   | 2019-01-01T15:00:00Z | Session3
  U2   | 2019-01-01T11:00:00Z | Session4
  U2   | 2019-01-02T11:00:00Z | Session5
  U2   | 2019-01-02T11:25:00Z | Session5
  U2   | 2019-01-02T11:50:00Z | Session5
  U2   | 2019-01-02T12:15:00Z | Session5
  U2   | 2019-01-02T12:40:00Z | Session5
  U2   | 2019-01-02T13:05:00Z | Session6
  U2   | 2019-01-02T13:20:00Z | Session6
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col

val df = sc.parallelize(List(("U1","2019-01-01T11:00:00Z"),("U1","2019-01-01T11:15:00Z"),("U1","2019-01-01T12:00:00Z"),("U1","2019-01-01T12:20:00Z"),("U1","2019-01-01T15:00:00Z"),("U2","2019-01-01T11:00:00Z"),("U2","2019-01-02T11:00:00Z"),("U2","2019-01-02T11:25:00Z"),("U2","2019-01-02T11:50:00Z"),("U2","2019-01-02T12:15:00Z"),("U2","2019-01-02T12:40:00Z"),("U2","2019-01-02T13:05:00Z"),("U2","2019-01-02T13:20:00Z"))).toDF("UserId","time").withColumn("ClickTime", col("time").cast("timestamp")).drop(col("time"))
val windowSpec = Window.partitionBy("userid").orderBy("clicktime")
val lagWindow = lag(col("clicktime"), 1).over(windowSpec)
val df1 = df.select(col("userid"), col("clicktime"), lagWindow.alias("prevclicktime")).withColumn("timediff", (col("clicktime").cast("long") - col("prevclicktime").cast("long"))).na.fill(Map("timediff" -> 0)).drop(col("prevclicktime"))
df1.show(truncate = false)

Я не могу применить второе условие, так как нам нужно накопить продолжительность сеанса и проверить, что накопленное значение составляет менее 2 часов. Если накопленная продолжительность больше 2 часов, необходимо назначить новый сеанс.

Пожалуйста, помогите мне в этом.

Заранее спасибо ...!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...