Идея здесь будет заключаться в том, чтобы определить «когда» это новый сеанс (Если я хорошо понял, это когда lag(login_status, 1) !== login_status && login_status
, определите его как int и суммируйте по ним, чтобы дать идентификатор «сеансам».
Допустим, что это должен делать простой групповой.
Давайте сделаем это с оконными функциями!
// Some needed imports
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{lag, min, sum}
// Our df
val df = Seq((534, false, "06-06-2019 12:12:30", 0),
(534, false, "06-06-2019 12:12:35", 0),
(534, true, "06-06-2019 12:17:30", 1),
(534, true, "06-06-2019 12:18:35", 3),
(534, false, "06-06-2019 12:19:35", 0),
(534, false, "06-06-2019 12:20:35", 0),
(534, true, "06-06-2019 12:21:30", 8),
(534, true, "06-06-2019 12:22:35", 7),
(534, true, "06-06-2019 12:23:30", 1),
(534, false, "06-06-2019 12:14:35", 0)).toDF("user_id", "login_status", "timestamp", "page_id")
// The window on which we will lag over to define a new session
val userSessionWindow = Window.partitionBy("user_id").orderBy("timestamp")
// Our window to give ids to our sessions
val userWindow = Window.partitionBy("user_id", "login_status").orderBy("timestamp")
// This is how we define a new session, you can change it as you want
val newSession = ('login_status !== lag('login_status, 1).over(userSessionWindow) && 'login_status).cast("bigint")
df.withColumn("new_session", newSession).show
Итак, здесь, для каждого нового сеанса мы получаем номер 1, этотвыглядит великолепно!
+-------+------------+-------------------+-------+-----------+
|user_id|login_status| timestamp|page_id|new_session|
+-------+------------+-------------------+-------+-----------+
| 534| false|06-06-2019 12:12:30| 0| 0|
| 534| false|06-06-2019 12:12:35| 0| 0|
| 534| false|06-06-2019 12:14:35| 0| 0|
| 534| true|06-06-2019 12:17:30| 1| 1|
| 534| true|06-06-2019 12:18:35| 3| 0|
| 534| false|06-06-2019 12:19:35| 0| 0|
| 534| false|06-06-2019 12:20:35| 0| 0|
| 534| true|06-06-2019 12:21:30| 8| 1|
| 534| true|06-06-2019 12:22:35| 7| 0|
| 534| true|06-06-2019 12:23:30| 1| 0|
+-------+------------+-------------------+-------+-----------+
Давайте определим идентификаторы для нашего сеанса со скользящей суммой.
val withSessionIDs = df.withColumn("session", sum(newSession).over(userWindow))
withSessionIDs.show
+-------+------------+-------------------+-------+-------+
|user_id|login_status| timestamp|page_id|session|
+-------+------------+-------------------+-------+-------+
| 534| false|06-06-2019 12:12:30| 0| 0|
| 534| false|06-06-2019 12:12:35| 0| 0|
| 534| false|06-06-2019 12:14:35| 0| 0|
| 534| false|06-06-2019 12:19:35| 0| 0|
| 534| false|06-06-2019 12:20:35| 0| 0|
| 534| true|06-06-2019 12:17:30| 1| 1|
| 534| true|06-06-2019 12:18:35| 3| 1|
| 534| true|06-06-2019 12:21:30| 8| 2|
| 534| true|06-06-2019 12:22:35| 7| 2|
| 534| true|06-06-2019 12:23:30| 1| 2|
+-------+------------+-------------------+-------+-------+
Теперь у нас есть идентификатор или сеанс, мы можем просто сделать групповую операцию!
// Do not forget to remove the 0 session, which stands for every user not logged in
withSessionIDs.groupBy("user_id", "session").agg(count("*").as("page_visited"), first("timestamp").as("first_login")).where($"session" !== 0).drop("session").show
+-------+------------+-------------------+
|user_id|page_visited| first_login|
+-------+------------+-------------------+
| 534| 2|06-06-2019 12:17:30|
| 534| 3|06-06-2019 12:21:30|
+-------+------------+-------------------+