условный подсчет в искре - PullRequest
1 голос
/ 11 июня 2019

Я не хочу считать количество посещений страницы пользователем в сеансе, здесь моя проблема в том, что у пользователя может быть несколько сеансов в день, и у меня user_id, login_status и отметка времени, как указано ниже.

user_id  login_status     timestamp               page_id
534        False        06-06-2019 12:12:30       0
534        False        06-06-2019 12:12:35       0
534        True         06-06-2019 12:17:30       1
534        True         06-06-2019 12:18:35       3
534        False        06-06-2019 12:19:35       0
534        False        06-06-2019 12:20:35       0
534        True         06-06-2019 12:21:30       8
534        True         06-06-2019 12:22:35       7
534        True         06-06-2019 12:23:30       1
534        False        06-06-2019 12:14:35       0

Excepted Output: -

user_id      timestamp               Page_count 
534         06-06-2019 12:17:30       2
534         06-06-2019 12:21:30       3

вход в систему начинается, когда статус становится истинным, и выход из системы, когда статус ложный.Пожалуйста, помогите, как я могу посчитать посещение страницы за один сеанс.Заранее спасибо.

1 Ответ

1 голос
/ 11 июня 2019

Идея здесь будет заключаться в том, чтобы определить «когда» это новый сеанс (Если я хорошо понял, это когда lag(login_status, 1) !== login_status && login_status, определите его как int и суммируйте по ним, чтобы дать идентификатор «сеансам».

Допустим, что это должен делать простой групповой.

Давайте сделаем это с оконными функциями!

// Some needed imports
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{lag, min, sum}

// Our df
val df = Seq((534, false, "06-06-2019 12:12:30", 0),
(534, false, "06-06-2019 12:12:35", 0),
(534, true, "06-06-2019 12:17:30", 1),
(534, true, "06-06-2019 12:18:35", 3),
(534, false, "06-06-2019 12:19:35", 0),
(534, false, "06-06-2019 12:20:35", 0),
(534, true, "06-06-2019 12:21:30", 8),
(534, true, "06-06-2019 12:22:35", 7),
(534, true, "06-06-2019 12:23:30", 1),
(534, false, "06-06-2019 12:14:35", 0)).toDF("user_id", "login_status", "timestamp", "page_id")

// The window on which we will lag over to define a new session
val userSessionWindow = Window.partitionBy("user_id").orderBy("timestamp")
// Our window to give ids to our sessions
val userWindow = Window.partitionBy("user_id", "login_status").orderBy("timestamp")

// This is how we define a new session, you can change it as you want
val newSession = ('login_status !== lag('login_status, 1).over(userSessionWindow) && 'login_status).cast("bigint")

df.withColumn("new_session", newSession).show

Итак, здесь, для каждого нового сеанса мы получаем номер 1, этотвыглядит великолепно!

+-------+------------+-------------------+-------+-----------+
|user_id|login_status|          timestamp|page_id|new_session|
+-------+------------+-------------------+-------+-----------+
|    534|       false|06-06-2019 12:12:30|      0|          0|
|    534|       false|06-06-2019 12:12:35|      0|          0|
|    534|       false|06-06-2019 12:14:35|      0|          0|
|    534|        true|06-06-2019 12:17:30|      1|          1|
|    534|        true|06-06-2019 12:18:35|      3|          0|
|    534|       false|06-06-2019 12:19:35|      0|          0|
|    534|       false|06-06-2019 12:20:35|      0|          0|
|    534|        true|06-06-2019 12:21:30|      8|          1|
|    534|        true|06-06-2019 12:22:35|      7|          0|
|    534|        true|06-06-2019 12:23:30|      1|          0|
+-------+------------+-------------------+-------+-----------+

Давайте определим идентификаторы для нашего сеанса со скользящей суммой.

val withSessionIDs = df.withColumn("session", sum(newSession).over(userWindow))

withSessionIDs.show
+-------+------------+-------------------+-------+-------+
|user_id|login_status|          timestamp|page_id|session|
+-------+------------+-------------------+-------+-------+
|    534|       false|06-06-2019 12:12:30|      0|      0|
|    534|       false|06-06-2019 12:12:35|      0|      0|
|    534|       false|06-06-2019 12:14:35|      0|      0|
|    534|       false|06-06-2019 12:19:35|      0|      0|
|    534|       false|06-06-2019 12:20:35|      0|      0|
|    534|        true|06-06-2019 12:17:30|      1|      1|
|    534|        true|06-06-2019 12:18:35|      3|      1|
|    534|        true|06-06-2019 12:21:30|      8|      2|
|    534|        true|06-06-2019 12:22:35|      7|      2|
|    534|        true|06-06-2019 12:23:30|      1|      2|
+-------+------------+-------------------+-------+-------+

Теперь у нас есть идентификатор или сеанс, мы можем просто сделать групповую операцию!

// Do not forget to remove the 0 session, which stands for every user not logged in
withSessionIDs.groupBy("user_id", "session").agg(count("*").as("page_visited"), first("timestamp").as("first_login")).where($"session" !== 0).drop("session").show
+-------+------------+-------------------+
|user_id|page_visited|        first_login|
+-------+------------+-------------------+
|    534|           2|06-06-2019 12:17:30|
|    534|           3|06-06-2019 12:21:30|
+-------+------------+-------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...