Примечание: Моя группировка может содержать до 5-10 тыс. Строк на группу для агрегации. Поэтому эффективный код крайне желателен.
Мои данные
val df1 = sc.parallelize(Seq(
("user2", "iphone", "2017-12-23 16:58:08", "Success"),
("user2", "iphone", "2017-12-23 16:58:12", "Success"),
("user2", "iphone", "2017-12-23 16:58:20", "Success"),
("user2", "iphone", "2017-12-23 16:58:25", "Success"),
("user2", "iphone", "2017-12-23 16:58:35", "Success"),
("user2", "iphone", "2017-12-23 16:58:45", "Success")
)).toDF("username", "device", "attempt_at", "stat")
+--------+------+-------------------+-------+
|username|device| attempt_at| stat|
+--------+------+-------------------+-------+
| user2|iphone|2017-12-23 16:58:08|Success|
| user2|iphone|2017-12-23 16:58:12|Success|
| user2|iphone|2017-12-23 16:58:20|Success|
| user2|iphone|2017-12-23 16:58:25|Success|
| user2|iphone|2017-12-23 16:58:35|Success|
| user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+
Что я хочу
Группировка по (имени пользователя, устройству) за последнее время, когда произошло событие.
+--------+------+-------------------+-------+-------------------+
|username|device| attempt_at| stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
| user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+
Исключения в желаемом выводе:
Теперь, поскольку я упомянул, он должен находиться в определенном временном окне, например во входном наборе данных ниже, где последняя строка имеет
самая поздняя отметка времени 23 декабря. Теперь, если я хочу указать конкретное временное окно возврата на 1 день и дать мне последнюю попытку, столбец 'previous_attempt_at' будет нулевым, поскольку в предыдущий день нет событий, которые должны быть 22 января. Все зависит от диапазона времени ввода.
//Initial Data
+--------+------+-------------------+-------+
|username|device| attempt_at| stat|
+--------+------+-------------------+-------+
| user2|iphone|2017-12-20 16:58:08|Success|
| user2|iphone|2017-12-20 16:58:12|Success|
| user2|iphone|2017-12-20 16:58:20|Success|
| user2|iphone|2017-12-20 16:58:25|Success|
| user2|iphone|2017-12-20 16:58:35|Success|
| user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+
// Desired Output
A grouping by (username,device) for the latest time an event occurred.
+--------+------+-------------------+-------+-------------------+
|username|device| attempt_at| stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
| user2|iphone|2017-12-23 16:58:45|Success| null|
+--------+------+-------------------+-------+-------------------+
Что у меня есть .
val w = (Window.partitionBy("username", "device")
.orderBy(col("attempt_at").cast("timestamp").cast("long"))
.rangeBetween(-3600, -1)
)
val df2 = df1.withColumn("previous_attempt_at", last("attempt_at").over(w))
+--------+------+-------------------+-------+-------------------+
|username|device| attempt_at| stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
| user2|iphone|2017-12-23 16:58:08|Success| null|
| user2|iphone|2017-12-23 16:58:12|Success|2017-12-23 16:58:08|
| user2|iphone|2017-12-23 16:58:20|Success|2017-12-23 16:58:12|
| user2|iphone|2017-12-23 16:58:25|Success|2017-12-23 16:58:20|
| user2|iphone|2017-12-23 16:58:35|Success|2017-12-23 16:58:25|
| user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+
Примечания .
Код, который я имею, делает управление окнами для каждой строки в определенной группе пользователей.
Что крайне неэффективно при работе с большими объемами данных, также не дает последней попытки. Мне не нужны все строки, кроме последней.