Выберите последнюю запись метки времени после оконной операции для каждой группы данных с помощью Spark Scala - PullRequest
0 голосов
/ 29 апреля 2018

Я выполнил подсчет попыток (пользователь, приложение) за временной промежуток дня (86400). Я хочу извлечь строки с последней отметкой времени с количеством и удалить ненужные предыдущие значения. Убедитесь, что ваш ответ учитывает временное окно. Один пользователь с 1 устройством может делать несколько попыток в день или неделю, я хочу иметь возможность извлекать эти конкретные моменты с окончательным счетом в каждом конкретном окне.

Мой начальный набор данных выглядит так:

val df = sc.parallelize(Seq(
  ("user1", "iphone", "2017-12-22 10:06:18", "Success"),
  ("user1", "iphone", "2017-12-22 11:15:12",  "failed"),
  ("user1", "iphone", "2017-12-22 12:06:18", "Success"),
  ("user1", "iphone", "2017-12-22 09:15:12",  "failed"),
  ("user1", "iphone", "2017-12-20 10:06:18", "Success"),
  ("user1", "iphone", "2017-12-20 11:15:12",  "failed"),
  ("user1", "iphone", "2017-12-20 12:06:18", "Success"),
  ("user1", "iphone", "2017-12-20 09:15:12",  "failed"),
  ("user1", "android", "2017-12-20 09:25:20", "Success"),
  ("user1", "android", "2017-12-20 09:44:22", "Success"),
  ("user1", "android", "2017-12-20 09:58:22", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:20", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:25", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:35", "Success")
)).toDF("username", "device", "date_time", "status")

Код, который я запустил и что я получил .

// Basically I'm looking 1 day which is 86400 seconds
val w1 = Window.partitionBy("username", "device")
               .orderBy(col("date_time").cast("date_time").cast("long").desc)
               .rangeBetween(-86400, 0) 


val countEveryAttemptDF = df.withColumn("attempts", count("device").over(w1))

Теперь у меня есть

// countEveryAttemptDF.show
+--------+--------------+---------------------+-------+--------+
|username|.       device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|   user1|       android|  2017-12-20 09:58:22|Success|       1|
|   user1|       android|  2017-12-20 09:44:22|Success|       2|
|   user1|       android|  2017-12-20 09:25:20|Success|       3|
|   user1|        iphone|  2017-12-22 12:06:18|Success|       1|
|   user1|        iphone|  2017-12-22 11:15:12| failed|       2|
|   user1|        iphone|  2017-12-22 10:06:18|Success|       3|
|   user1|        iphone|  2017-12-22 09:15:12| failed|       4|
|   user1|        iphone|  2017-12-20 16:44:35|Success|       1|
|   user1|        iphone|  2017-12-20 16:44:25|Success|       2|
|   user1|        iphone|  2017-12-20 16:44:20|Success|       3|
|   user1|        iphone|  2017-12-20 12:06:18|Success|       4|
|   user1|        iphone|  2017-12-20 11:15:12| failed|       5|
|   user1|        iphone|  2017-12-20 10:06:18|Success|       6|
|   user1|        iphone|  2017-12-20 09:15:12| failed|       7|
+--------+--------------+---------------------+-------+--------+

Что я хочу . Поэтому я хочу получить последнюю метку времени вместе с ее счетом, убедившись, что я в одном и том же временном окне.

+--------+--------------+---------------------+-------+--------+
|username|.       device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|  user1     |       android    |  2017-12-20 09:25:20|Success|       3|
|  user1     |        iphone    |  2017-12-22 09:15:12| failed|       4|
|  user1     |        iphone    |  2017-12-20 09:15:12| failed|       7|
+--------+--------------+---------------------+-------+--------+**

1 Ответ

0 голосов
/ 29 апреля 2018

Вы почти там . Вы вычислили счет, посмотрев на дневной диапазон. Теперь все, что вам нужно сделать, это выяснить последнюю запись в этом однодневном диапазоне, что можно сделать, используя функцию last для той же самой оконной функции, но с обратным диапазоном .

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

def day(x: Int) = x * 86400

val w1 = Window.partitionBy("username", "device")
  .orderBy(col("date_time").cast("timestamp").cast("long").desc)
  .rangeBetween(-day(1), 0)
val w2 = Window.partitionBy("username", "device")
  .orderBy(col("date_time").cast("timestamp").cast("long").desc)
  .rangeBetween(0, day(1))

val countEveryAttemptDF = df.withColumn("attempts", count("application_id").over(w1))
                            .withColumn("att", last("attempts").over(w2))
                            .filter(col("attempts") === col("att"))
                            .drop("att")

что должно дать вам

+--------+--------------+---------------------+-------+--------+
|username|        device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|user1   |android       |2017-12-20 09:25:20  |Success|3       |
|user1   |iphone        |2017-12-22 09:15:12  | Failed|4       |
|user1   |iphone        |2017-12-20 09:15:12  | Failed|7       |
+--------+--------------+---------------------+-------+--------+

так же, как указано в комментариях ниже

86400 секунд в 1 дне. Я хотел оглянуться назад на 1 день. Точно так же 3600 секунд это 1 час. И 604 800 секунд за 1 неделю

Вы можете изменить дневную функцию на часы и недели, как показано ниже, и использовать их в окне rangeBetween

def hour(x: Int) = x * 3600
def week(x: Int) = x * 604800

Надеюсь, ответ полезен

...