org.apache.spark.sql.functions
обеспечивает перегруженные функции окна, как показано ниже
1.window (timeColumn: Column, windowDuration: String): Создает падающие временные окна с учетом столбца, указывающего метку времени.Начало окна включено, но конец окна является эксклюзивным, например, 12:05 будет в окне [12: 05,12: 10), но не в [12: 00,12: 05).
Окнабудет выглядеть так:
{{{
09:00:00-09:01:00
09:01:00-09:02:00
09:02:00-09:03:00 ...
}}}
2.window ((timeColumn: Column, windowDuration: String, slideDuration: String): Объединить строки в одно или несколько окон времени с учетом столбца, указывающего метку времени. Начало окна включительно, но окончания окна являются исключительными, например, 12:05 будетв окне [12: 05,12: 10), но не в [12: 00,12: 05). slideDuration Параметр, задающий интервал скольжения окна, например, 1 minute
. Новое окно будет создаваться каждые slideDuration
.Должно быть меньше или равно windowDuration
.
Окна будут выглядеть так:
{{{
09:00:00-09:01:00
09:00:10-09:01:10
09:00:20-09:01:20 ...
}}}
3.window ((timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Объединить строки в одно или несколько окон времени с учетом столбца, указывающего метку времени. Начало окна включительно, но окончания окна являются исключительными, например, 12: 05 будет в окне [12: 05,12: 10), но не в [12: 00,12: 05).
Окна будут выглядеть так:
{{{
09:00:05-09:01:05
09:00:15-09:01:15
09:00:25-09:01:25 ...
}}}
Например, чтобы иметь ежечасные падающие окна, которые начинаются через 15 минут после часа, например, 12: 15-13: 15, 13: 15-14: 15 ... укажите startTime
как 15 minutes
. Это идеальная функция перегрузки окна, которая соответствует вашим требованиям.
Рабочий код приведен ниже.
import org.apache.spark.sql.SparkSession
object SparkWindowTest extends App {
val spark = SparkSession
.builder()
.master("local")
.appName("File_Streaming")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
//Prepare Test Data
val df = Seq((1, 10, "2019-02-17 12:00:49"), (2, 20, "2019-02-17 11:10:46"),
(3, 30, "2019-02-17 13:23:34"),(2, 50, "2019-02-17 11:10:30"),
(1, 40, "2019-02-17 12:01:02"), (1, 60, "2019-02-17 12:01:57"))
.toDF("ID", "Volume", "TimeString")
df.show()
df.printSchema()
+---+------+-------------------+
| ID|Volume| TimeString|
+---+------+-------------------+
| 1| 10|2019-02-17 12:00:49|
| 2| 20|2019-02-17 11:10:46|
| 3| 30|2019-02-17 13:23:34|
| 2| 50|2019-02-17 11:10:30|
| 1| 40|2019-02-17 12:01:02|
| 1| 60|2019-02-17 12:01:57|
+---+------+-------------------+
root
|-- ID: integer (nullable = false)
|-- Volume: integer (nullable = false)
|-- TimeString: string (nullable = true)
//Converted String Timestamp into Timestamp
val modifiedDF = df.withColumn("Time", to_timestamp($"TimeString"))
//Dropped String Timestamp from DF
val modifiedDF1 = modifiedDF.drop("TimeString")
modifiedDF.show(false)
modifiedDF.printSchema()
+---+------+-------------------+-------------------+
|ID |Volume|TimeString |Time |
+---+------+-------------------+-------------------+
|1 |10 |2019-02-17 12:00:49|2019-02-17 12:00:49|
|2 |20 |2019-02-17 11:10:46|2019-02-17 11:10:46|
|3 |30 |2019-02-17 13:23:34|2019-02-17 13:23:34|
|2 |50 |2019-02-17 11:10:30|2019-02-17 11:10:30|
|1 |40 |2019-02-17 12:01:02|2019-02-17 12:01:02|
|1 |60 |2019-02-17 12:01:57|2019-02-17 12:01:57|
+---+------+-------------------+-------------------+
root
|-- ID: integer (nullable = false)
|-- Volume: integer (nullable = false)
|-- TimeString: string (nullable = true)
|-- Time: timestamp (nullable = true)
modifiedDF1.show(false)
modifiedDF1.printSchema()
+---+------+-------------------+
|ID |Volume|Time |
+---+------+-------------------+
|1 |10 |2019-02-17 12:00:49|
|2 |20 |2019-02-17 11:10:46|
|3 |30 |2019-02-17 13:23:34|
|2 |50 |2019-02-17 11:10:30|
|1 |40 |2019-02-17 12:01:02|
|1 |60 |2019-02-17 12:01:57|
+---+------+-------------------+
root
|-- ID: integer (nullable = false)
|-- Volume: integer (nullable = false)
|-- Time: timestamp (nullable = true)
//Main logic
val modifiedDF2 = modifiedDF1.groupBy($"ID", window($"Time", "1 minutes","1 minutes","45 seconds")).sum("Volume")
//Renamed all columns of DF.
val newNames = Seq("ID", "WINDOW", "VOLUME")
val finalDF = modifiedDF2.toDF(newNames: _*)
finalDF.show(false)
+---+---------------------------------------------+------+
|ID |WINDOW |VOLUME|
+---+---------------------------------------------+------+
|2 |[2019-02-17 11:09:45.0,2019-02-17 11:10:45.0]|50 |
|1 |[2019-02-17 12:01:45.0,2019-02-17 12:02:45.0]|60 |
|1 |[2019-02-17 12:00:45.0,2019-02-17 12:01:45.0]|50 |
|3 |[2019-02-17 13:22:45.0,2019-02-17 13:23:45.0]|30 |
|2 |[2019-02-17 11:10:45.0,2019-02-17 11:11:45.0]|20 |
+---+---------------------------------------------+------+
}