Сгруппируйте по ключу и найдите предыдущую временную метку события, которое произошло в определенном временном окне, эффективно с помощью Spark / Scala - PullRequest
0 голосов
/ 28 апреля 2018

Примечание: Моя группировка может содержать до 5-10 тыс. Строк на группу для агрегации. Поэтому эффективный код крайне желателен.

Мои данные

val df1 = sc.parallelize(Seq(
  ("user2", "iphone", "2017-12-23 16:58:08", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:12", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:20", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:25", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:35", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:45", "Success")
)).toDF("username", "device", "attempt_at", "stat")
+--------+------+-------------------+-------+
|username|device|         attempt_at|   stat|
+--------+------+-------------------+-------+
|   user2|iphone|2017-12-23 16:58:08|Success|
|   user2|iphone|2017-12-23 16:58:12|Success|
|   user2|iphone|2017-12-23 16:58:20|Success|
|   user2|iphone|2017-12-23 16:58:25|Success|
|   user2|iphone|2017-12-23 16:58:35|Success|
|   user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+

Что я хочу
Группировка по (имени пользователя, устройству) за последнее время, когда произошло событие.

+--------+------+-------------------+-------+-------------------+
|username|device|         attempt_at|   stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
|   user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+

Исключения в желаемом выводе:
Теперь, поскольку я упомянул, он должен находиться в определенном временном окне, например во входном наборе данных ниже, где последняя строка имеет самая поздняя отметка времени 23 декабря. Теперь, если я хочу указать конкретное временное окно возврата на 1 день и дать мне последнюю попытку, столбец 'previous_attempt_at' будет нулевым, поскольку в предыдущий день нет событий, которые должны быть 22 января. Все зависит от диапазона времени ввода.

//Initial Data
+--------+------+-------------------+-------+
|username|device|         attempt_at|   stat|
+--------+------+-------------------+-------+
|   user2|iphone|2017-12-20 16:58:08|Success|
|   user2|iphone|2017-12-20 16:58:12|Success|
|   user2|iphone|2017-12-20 16:58:20|Success|
|   user2|iphone|2017-12-20 16:58:25|Success|
|   user2|iphone|2017-12-20 16:58:35|Success|
|   user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+

// Desired Output
A grouping by (username,device) for the latest time an event occurred.

    +--------+------+-------------------+-------+-------------------+
    |username|device|         attempt_at|   stat|previous_attempt_at|
    +--------+------+-------------------+-------+-------------------+
    |   user2|iphone|2017-12-23 16:58:45|Success|               null|
    +--------+------+-------------------+-------+-------------------+

Что у меня есть .

val w = (Window.partitionBy("username", "device")
                 .orderBy(col("attempt_at").cast("timestamp").cast("long"))
                   .rangeBetween(-3600, -1)
                 )

val df2 = df1.withColumn("previous_attempt_at", last("attempt_at").over(w))

+--------+------+-------------------+-------+-------------------+
|username|device|         attempt_at|   stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
|   user2|iphone|2017-12-23 16:58:08|Success|               null|
|   user2|iphone|2017-12-23 16:58:12|Success|2017-12-23 16:58:08|
|   user2|iphone|2017-12-23 16:58:20|Success|2017-12-23 16:58:12|
|   user2|iphone|2017-12-23 16:58:25|Success|2017-12-23 16:58:20|
|   user2|iphone|2017-12-23 16:58:35|Success|2017-12-23 16:58:25|
|   user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+

Примечания . Код, который я имею, делает управление окнами для каждой строки в определенной группе пользователей. Что крайне неэффективно при работе с большими объемами данных, также не дает последней попытки. Мне не нужны все строки, кроме последней.

1 Ответ

0 голосов
/ 28 апреля 2018

Все, что вам нужно, это дополнительные groupBy и aggregation, но перед этим вам понадобится функция collect_list для кумулятивного сбора предыдущих дат и функция udf для проверки предыдущей попытки_ в течение времени ограничить и преобразовать три столбца ("attempt_at", "stat", "previous_attempt_at") в struct для выбора последнего в

import org.apache.spark.sql.functions._
import java.time._
import java.time.temporal._
import java.time.format._
def durationUdf = udf((actualtimestamp: String, timestamps: Seq[String])=> {
  val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
  val actualDateTime = LocalDateTime.parse(actualtimestamp, formatter)
  val diffDates = timestamps.init.filter(x => LocalDateTime.from(LocalDateTime.parse(x, formatter)).until(actualDateTime, ChronoUnit.DAYS) <= 1)
  if(diffDates.size > 0) diffDates.last else null
})

import org.apache.spark.sql.expressions._
val w = Window.partitionBy("username", "device").orderBy(col("attempt_at").cast("timestamp").cast("long"))

val df2 = df1.withColumn("previous_attempt_at", durationUdf(col("attempt_at"), collect_list("attempt_at").over(w)))
  .withColumn("struct", struct(col("attempt_at").cast("timeStamp").as("attempt_at"),col("stat"), col("previous_attempt_at")))
  .groupBy("username", "device").agg(max("struct").as("struct"))
  .select(col("username"), col("device"), col("struct.attempt_at"), col("struct.stat"), col("struct.previous_attempt_at"))

Это должно дать вам для последующего примера

+--------+------+---------------------+-------+-------------------+
|username|device|attempt_at           |stat   |previous_attempt_at|
+--------+------+---------------------+-------+-------------------+
|user2   |iphone|2017-12-23 16:58:45.0|Success|null               |
+--------+------+---------------------+-------+-------------------+

и следующие для предыдущего ввода d ata

+--------+------+---------------------+-------+-------------------+
|username|device|attempt_at           |stat   |previous_attempt_at|
+--------+------+---------------------+-------+-------------------+
|user2   |iphone|2017-12-23 16:58:45.0|Success|2017-12-23 16:58:35|
+--------+------+---------------------+-------+-------------------+

и вы можете изменить логику для часов, изменив функцию ChronoUnit.DAYS in udf на ChronoUnit.HOURS и т. Д.

...