Я хочу найти последний / предыдущий раз, когда предпринималась попытка входа в систему от определенного пользователя (пользователя и устройства) на основе окна отметки времени.
For example my initial dataset looks like this:
+--------+-------+-------------------+-------+
|username| device| attempt_at| stat|
+--------+-------+-------------------+-------+
| user1| pc|2018-01-02 07:44:27| failed|
| user1| pc|2018-01-02 07:44:10|Success|
| user2| iphone|2017-12-23 16:58:08|Success|
| user2| iphone|2017-12-23 16:58:30|Success|
| user2| iphone|2017-12-23 16:58:50| failed|
| user1|android|2018-01-02 07:44:37| failed|
| user1|android|2018-01-05 08:33:47| failed|
+--------+-------+-------------------+-------+
//code
val df1 = sc.parallelize(Seq(
("user1", "pc", "2018-01-02 07:44:27", "failed"),
("user1", "pc", "2018-01-02 07:44:10", "Success"),
("user2", "iphone", "2017-12-23 16:58:08", "Success"),
("user2", "iphone", "2017-12-23 16:58:30", "Success"),
("user2", "iphone", "2017-12-23 16:58:50", "failed"),
("user1", "android", "2018-01-02 07:44:37", "failed"),
("user1", "android", "2018-01-05 08:33:47", "failed")
)).toDF("username", "device", "attempt_at", "stat")
Что я хочу
Окна 1 час и 7 дней, где я могу найти предыдущие попытки в метке времени для каждого конкретного пользователя и устройства. В основном сгруппированы по пользователю и устройству.
Например: для 'user1' и устройства 'pc' для вышеуказанного набора данных предыдущей попыткой для окна продолжительностью 1 час и 7 дней будет '2018-01-02 07:44:27'.
Но для устройства 'android' для user1 предыдущей попыткой в течение 7 дней будет '2018-01-02 07:44:27', но ничего для окна в течение 1 часа, поскольку в течение последних 1 часа в течение 1 часа не было попыток user1 от андроида.
Ожидаемые наборы выходных данных
// 1 hr window for last known attempt
+--------+-------+---------------------+--------------------+
|username| device| attempt_at| previous_attempt_at|
+--------+-------+---------------------+--------------------+
| user1| pc| 2018-01-02 07:44:10| 2018-01-02 07:44:27|
| user2| iphone| 2017-12-23 16:58:50| 2017-12-23 16:58:30|
+--------+-------+---------------------+--------------------+
// 7 days window for last known attempt
+--------+--------+---------------------+--------------------+
|username| device | attempt_at| previous_attempt_at|
+--------+--------+---------------------+--------------------+
| user1| pc | 2018-01-02 07:44:10| 2018-01-02 07:44:27|
| user1| android| 2018-01-05 08:33:47| 2018-01-02 07:44:37|
| user2| iphone| 2017-12-23 16:58:50| 2017-12-23 16:58:30|
+--------+--------+---------------------+--------------------+
Что я пробовал:
Я пытался использовать окно в течение 1 часа, используя «последний». Это дает текущую временную метку строк, но не предыдущую, основанную на окне.
val w = (Window.partitionBy("username", "device")
.orderBy(col("attempt_at").cast("timestamp").cast("long"))
.rangeBetween(-3600, 0)
)
val df2 = df1.withColumn("previous_attempt_at", last("attempt_at").over(w))