Как сгруппировать записи, которые находятся в пределах определенного интервала времени, используя Spark Scala или sql? - PullRequest
1 голос
/ 28 марта 2019

Я бы хотел сгруппировать записи в scala, только если они имеют одинаковый идентификатор и их время не превышает 1 минуты.

Я думаю, концептуально что-то вроде этого? Но я не совсем уверен

HAVING a.ID = b.ID AND a.time + 30 sec > b.time AND a.time - 30 sec < b.time




| ID         |     volume  |           Time             |
|:-----------|------------:|:--------------------------:|
| 1          |      10     |    2019-02-17T12:00:34Z    |
| 2          |      20     |    2019-02-17T11:10:46Z    |
| 3          |      30     |    2019-02-17T13:23:34Z    |
| 1          |      40     |    2019-02-17T12:01:02Z    |
| 2          |      50     |    2019-02-17T11:10:30Z    |
| 1          |      60     |    2019-02-17T12:01:57Z    |

к этому:

| ID         |     volume  | 
|:-----------|------------:|
| 1          |      50     |   // (10+40)
| 2          |      70     |   // (20+50)
| 3          |      30     |


df.groupBy($"ID", window($"Time", "1 minutes")).sum("volume")

приведенный выше код является 1 решением, но оно всегда округляется.

Например, 2019-02-17T12: 00: 45Z будет иметь диапазон

2019-02-17T12:00:00Z TO 2019-02-17T12:01:00Z.

Я ищу это вместо этого: 2019-02-17T11:45:00Z TO 2019-02-17T12:01:45Z.

Есть ли способ?

1 Ответ

1 голос
/ 28 марта 2019

org.apache.spark.sql.functions обеспечивает перегруженные функции окна, как показано ниже

1.window (timeColumn: Column, windowDuration: String): Создает падающие временные окна с учетом столбца, указывающего метку времени.Начало окна включено, но конец окна является эксклюзивным, например, 12:05 будет в окне [12: 05,12: 10), но не в [12: 00,12: 05).

Окнабудет выглядеть так:

  {{{
    09:00:00-09:01:00
    09:01:00-09:02:00
    09:02:00-09:03:00 ...
  }}}

2.window ((timeColumn: Column, windowDuration: String, slideDuration: String): Объединить строки в одно или несколько окон времени с учетом столбца, указывающего метку времени. Начало окна включительно, но окончания окна являются исключительными, например, 12:05 будетв окне [12: 05,12: 10), но не в [12: 00,12: 05). slideDuration Параметр, задающий интервал скольжения окна, например, 1 minute. Новое окно будет создаваться каждые slideDuration.Должно быть меньше или равно windowDuration.

Окна будут выглядеть так:

{{{
  09:00:00-09:01:00
  09:00:10-09:01:10
  09:00:20-09:01:20 ...
}}}

3.window ((timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Объединить строки в одно или несколько окон времени с учетом столбца, указывающего метку времени. Начало окна включительно, но окончания окна являются исключительными, например, 12: 05 будет в окне [12: 05,12: 10), но не в [12: 00,12: 05).

Окна будут выглядеть так:

{{{
  09:00:05-09:01:05
  09:00:15-09:01:15
  09:00:25-09:01:25 ...
}}}

Например, чтобы иметь ежечасные падающие окна, которые начинаются через 15 минут после часа, например, 12: 15-13: 15, 13: 15-14: 15 ... укажите startTime как 15 minutes. Это идеальная функция перегрузки окна, которая соответствует вашим требованиям.

Рабочий код приведен ниже.

import org.apache.spark.sql.SparkSession

object SparkWindowTest extends App {

  val spark = SparkSession
    .builder()
    .master("local")
    .appName("File_Streaming")
    .getOrCreate()

  import spark.implicits._
  import org.apache.spark.sql.functions._

  //Prepare Test Data
  val df = Seq((1, 10, "2019-02-17 12:00:49"), (2, 20, "2019-02-17 11:10:46"),
    (3, 30, "2019-02-17 13:23:34"),(2, 50, "2019-02-17 11:10:30"),
    (1, 40, "2019-02-17 12:01:02"), (1, 60, "2019-02-17 12:01:57"))
    .toDF("ID", "Volume", "TimeString")

  df.show()
  df.printSchema()

+---+------+-------------------+
| ID|Volume|         TimeString|
+---+------+-------------------+
|  1|    10|2019-02-17 12:00:49|
|  2|    20|2019-02-17 11:10:46|
|  3|    30|2019-02-17 13:23:34|
|  2|    50|2019-02-17 11:10:30|
|  1|    40|2019-02-17 12:01:02|
|  1|    60|2019-02-17 12:01:57|
+---+------+-------------------+

root
 |-- ID: integer (nullable = false)
 |-- Volume: integer (nullable = false)
 |-- TimeString: string (nullable = true)

  //Converted String Timestamp into Timestamp
  val modifiedDF = df.withColumn("Time", to_timestamp($"TimeString"))

  //Dropped String Timestamp from DF
  val modifiedDF1 = modifiedDF.drop("TimeString")

  modifiedDF.show(false)
  modifiedDF.printSchema()

+---+------+-------------------+-------------------+
|ID |Volume|TimeString         |Time               |
+---+------+-------------------+-------------------+
|1  |10    |2019-02-17 12:00:49|2019-02-17 12:00:49|
|2  |20    |2019-02-17 11:10:46|2019-02-17 11:10:46|
|3  |30    |2019-02-17 13:23:34|2019-02-17 13:23:34|
|2  |50    |2019-02-17 11:10:30|2019-02-17 11:10:30|
|1  |40    |2019-02-17 12:01:02|2019-02-17 12:01:02|
|1  |60    |2019-02-17 12:01:57|2019-02-17 12:01:57|
+---+------+-------------------+-------------------+

root
 |-- ID: integer (nullable = false)
 |-- Volume: integer (nullable = false)
 |-- TimeString: string (nullable = true)
 |-- Time: timestamp (nullable = true)

  modifiedDF1.show(false)
  modifiedDF1.printSchema()

+---+------+-------------------+
|ID |Volume|Time               |
+---+------+-------------------+
|1  |10    |2019-02-17 12:00:49|
|2  |20    |2019-02-17 11:10:46|
|3  |30    |2019-02-17 13:23:34|
|2  |50    |2019-02-17 11:10:30|
|1  |40    |2019-02-17 12:01:02|
|1  |60    |2019-02-17 12:01:57|
+---+------+-------------------+

root
 |-- ID: integer (nullable = false)
 |-- Volume: integer (nullable = false)
 |-- Time: timestamp (nullable = true)

  //Main logic
  val modifiedDF2 = modifiedDF1.groupBy($"ID", window($"Time", "1 minutes","1 minutes","45 seconds")).sum("Volume")

  //Renamed all columns of DF.
  val newNames = Seq("ID", "WINDOW", "VOLUME")
  val finalDF = modifiedDF2.toDF(newNames: _*)

  finalDF.show(false)

+---+---------------------------------------------+------+
|ID |WINDOW                                       |VOLUME|
+---+---------------------------------------------+------+
|2  |[2019-02-17 11:09:45.0,2019-02-17 11:10:45.0]|50    |
|1  |[2019-02-17 12:01:45.0,2019-02-17 12:02:45.0]|60    |
|1  |[2019-02-17 12:00:45.0,2019-02-17 12:01:45.0]|50    |
|3  |[2019-02-17 13:22:45.0,2019-02-17 13:23:45.0]|30    |
|2  |[2019-02-17 11:10:45.0,2019-02-17 11:11:45.0]|20    |
+---+---------------------------------------------+------+

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...