Подсчет перекрывающихся сессий в PySpark - PullRequest
0 голосов
/ 04 июня 2019

У меня есть набор данных с журналами действий пользователя, и я сеансизировал их (если пользователь обращается к моей системе с того же IP-адреса с перерывом менее 20 минут, все его действия принадлежат одному сеансу).

В качестве примера, скажем, у нас есть эти данные, которые могут относиться к нескольким организациям и пользователям:

from pyspark.sql.types import TimestampType

df = spark.createDataFrame(
    [
        ("org_1", 'user_1', '2018-12-20 19:55:29', 1, 1),
        ("org_1", 'user_1', '2018-12-20 19:55:30', 1, 0),
        ("org_1", 'user_1', '2018-12-20 19:55:31', 2, 1),
        ("org_1", 'user_1', '2018-12-20 19:55:32', 3, 1),
        ("org_1", 'user_1', '2018-12-20 19:55:33', 4, 1),
        ("org_1", 'user_1', '2018-12-20 19:55:34', 1, 0),
        ("org_1", 'user_1', '2018-12-20 19:55:35', 3, 0),
        ("org_1", 'user_1', '2018-12-20 19:55:36', 3, 0),
        ("org_1", 'user_1', '2018-12-20 19:55:37', 1, 0),
        ("org_1", 'user_1', '2018-12-20 19:55:38', 2, 0), 
    ],
    ("org_id", "user_id", "ymd", "session_id", "new_session")
)

df = df.withColumn("ymd", df['ymd'].cast(TimestampType()))


+------+-------+---------------------+----------+-----------+
|org_id|user_id|ymd                  |session_id|new_session|
+------+-------+---------------------+----------+-----------+
|org_1 |user_1 |2018-12-20 19:55:29.0|1         |1          |
|org_1 |user_1 |2018-12-20 19:55:30.0|1         |0          |
|org_1 |user_1 |2018-12-20 19:55:31.0|2         |1          |
|org_1 |user_1 |2018-12-20 19:55:32.0|3         |1          |
|org_1 |user_1 |2018-12-20 19:55:33.0|4         |1          |
|org_1 |user_1 |2018-12-20 19:55:34.0|1         |0          |
|org_1 |user_1 |2018-12-20 19:55:35.0|3         |0          |
|org_1 |user_1 |2018-12-20 19:55:36.0|3         |0          |
|org_1 |user_1 |2018-12-20 19:55:37.0|1         |0          |
|org_1 |user_1 |2018-12-20 19:55:38.0|2         |0          |
+------+-------+---------------------+----------+-----------+

Я хочу иметь возможность подсчитывать количество перекрывающихся сеансов на организацию / пользователя / день. В этом случае результат будет 6, поскольку все 4 сеанса перекрываются друг с другом.

Я думал запустить оконную функцию HIVE, разбитую по org, user, day и каким-то образом посчитать число session_ids, которое больше текущего session_id (что означает, что они перекрываются). Вот пример того, что я описываю:

+------+-------+---------------------+----------+-----------+--------------------+
|org_id|user_id|ymd                  |session_id|new_session|overlapping_sessions|
+------+-------+---------------------+----------+-----------+--------------------+
|org_1 |user_1 |2018-12-20 19:55:29.0|1         |1          |0
|org_1 |user_1 |2018-12-20 19:55:30.0|1         |0          |0
|org_1 |user_1 |2018-12-20 19:55:31.0|2         |1          |0
|org_1 |user_1 |2018-12-20 19:55:32.0|3         |1          |0
|org_1 |user_1 |2018-12-20 19:55:33.0|4         |1          |0
|org_1 |user_1 |2018-12-20 19:55:34.0|1         |0          | <- 3 (sessions 2, 3, 4 are above)    
|org_1 |user_1 |2018-12-20 19:55:35.0|3         |0          | <- 1 (session 4 is above)
|org_1 |user_1 |2018-12-20 19:55:36.0|3         |0          |0 (since we've examined session_3 already)
|org_1 |user_1 |2018-12-20 19:55:37.0|1         |0          |0 (since we've examined session_1 already)
|org_1 |user_1 |2018-12-20 19:55:38.0|2         |0          | <- 2 (sessions 3, 4 are above)
+------+-------+---------------------+----------+-----------+--------------------+

Тогда я могу просто подытожить столбец overlapping_sessions.

Я не могу понять, как этого добиться, используя spark.sql. Есть указатели?

...