У меня есть набор данных с журналами действий пользователя, и я сеансизировал их (если пользователь обращается к моей системе с того же IP-адреса с перерывом менее 20 минут, все его действия принадлежат одному сеансу).
В качестве примера, скажем, у нас есть эти данные, которые могут относиться к нескольким организациям и пользователям:
from pyspark.sql.types import TimestampType
df = spark.createDataFrame(
[
("org_1", 'user_1', '2018-12-20 19:55:29', 1, 1),
("org_1", 'user_1', '2018-12-20 19:55:30', 1, 0),
("org_1", 'user_1', '2018-12-20 19:55:31', 2, 1),
("org_1", 'user_1', '2018-12-20 19:55:32', 3, 1),
("org_1", 'user_1', '2018-12-20 19:55:33', 4, 1),
("org_1", 'user_1', '2018-12-20 19:55:34', 1, 0),
("org_1", 'user_1', '2018-12-20 19:55:35', 3, 0),
("org_1", 'user_1', '2018-12-20 19:55:36', 3, 0),
("org_1", 'user_1', '2018-12-20 19:55:37', 1, 0),
("org_1", 'user_1', '2018-12-20 19:55:38', 2, 0),
],
("org_id", "user_id", "ymd", "session_id", "new_session")
)
df = df.withColumn("ymd", df['ymd'].cast(TimestampType()))
+------+-------+---------------------+----------+-----------+
|org_id|user_id|ymd |session_id|new_session|
+------+-------+---------------------+----------+-----------+
|org_1 |user_1 |2018-12-20 19:55:29.0|1 |1 |
|org_1 |user_1 |2018-12-20 19:55:30.0|1 |0 |
|org_1 |user_1 |2018-12-20 19:55:31.0|2 |1 |
|org_1 |user_1 |2018-12-20 19:55:32.0|3 |1 |
|org_1 |user_1 |2018-12-20 19:55:33.0|4 |1 |
|org_1 |user_1 |2018-12-20 19:55:34.0|1 |0 |
|org_1 |user_1 |2018-12-20 19:55:35.0|3 |0 |
|org_1 |user_1 |2018-12-20 19:55:36.0|3 |0 |
|org_1 |user_1 |2018-12-20 19:55:37.0|1 |0 |
|org_1 |user_1 |2018-12-20 19:55:38.0|2 |0 |
+------+-------+---------------------+----------+-----------+
Я хочу иметь возможность подсчитывать количество перекрывающихся сеансов на организацию / пользователя / день. В этом случае результат будет 6
, поскольку все 4 сеанса перекрываются друг с другом.
Я думал запустить оконную функцию HIVE, разбитую по org, user, day и каким-то образом посчитать число session_ids, которое больше текущего session_id (что означает, что они перекрываются). Вот пример того, что я описываю:
+------+-------+---------------------+----------+-----------+--------------------+
|org_id|user_id|ymd |session_id|new_session|overlapping_sessions|
+------+-------+---------------------+----------+-----------+--------------------+
|org_1 |user_1 |2018-12-20 19:55:29.0|1 |1 |0
|org_1 |user_1 |2018-12-20 19:55:30.0|1 |0 |0
|org_1 |user_1 |2018-12-20 19:55:31.0|2 |1 |0
|org_1 |user_1 |2018-12-20 19:55:32.0|3 |1 |0
|org_1 |user_1 |2018-12-20 19:55:33.0|4 |1 |0
|org_1 |user_1 |2018-12-20 19:55:34.0|1 |0 | <- 3 (sessions 2, 3, 4 are above)
|org_1 |user_1 |2018-12-20 19:55:35.0|3 |0 | <- 1 (session 4 is above)
|org_1 |user_1 |2018-12-20 19:55:36.0|3 |0 |0 (since we've examined session_3 already)
|org_1 |user_1 |2018-12-20 19:55:37.0|1 |0 |0 (since we've examined session_1 already)
|org_1 |user_1 |2018-12-20 19:55:38.0|2 |0 | <- 2 (sessions 3, 4 are above)
+------+-------+---------------------+----------+-----------+--------------------+
Тогда я могу просто подытожить столбец overlapping_sessions.
Я не могу понять, как этого добиться, используя spark.sql
. Есть указатели?