как использовать искровую задержку и опережение по группам и упорядочить по - PullRequest
0 голосов
/ 01 мая 2018

я использую: `

dataset.withColumn("lead",lead(dataset.col(start_date),1).over(orderBy(start_date)));

` я просто хочу добавить группу по trackId, так что ведите работу над каждой группой как любую функцию agg:

+----------+---------------------------------------------+
|  trackId |  start_time    |  end_time   |      lead    |
+-----+--------------------------------------------------+
|  1       | 12:00:00       |   12:04:00  |     12:05:00 |
+----------+---------------------------------------------+
|  1       | 12:05:00       |   12:08:00  |    12:20:00  |  
+----------+---------------------------------------------+
|  1       | 12:20:00       |   12:22:00  |     null     | 
+----------+---------------------------------------------+
|  2       | 13:00:00       |   13:04:00  |    13:05:00 |
+----------+---------------------------------------------+
|  2       | 13:05:00       |   13:08:00  |    13:20:00  |  
+----------+---------------------------------------------+
|  2       | 13:20:00       |   13:22:00  |     null     | 
+----------+---------------------------------------------+

любая помощь, как это сделать?

Ответы [ 2 ]

0 голосов
/ 01 мая 2018

Вам нужно использовать Window

val df = Seq(
  (1, "12:00:00", "12:04:00"),
  (1, "12:05:00", "12:08:00"),
  (1, "12:20:00", "12:22:00"),
  (2, "13:00:00", "13:04:00"),
  (2, "13:05:00", "13:08:00"),
  (2, "13:20:00", "13:22:00")
).toDF( "trackId","start_time","end_time" )

val window  = Window.partitionBy("trackId").orderBy("start_time")

df.withColumn("lead",lead(col("start_time"),1).over(window))

Если вы не хотите иметь значение null, вы можете также передать значение по умолчанию как lead($"start_time",1, defaultValue)

Результат:

+-------+----------+--------+--------+
|trackId|start_time|end_time|lead    |
+-------+----------+--------+--------+
|1      |12:00:00  |12:04:00|12:05:00|
|1      |12:05:00  |12:08:00|12:20:00|
|1      |12:20:00  |12:22:00|null    |
|2      |13:00:00  |13:04:00|13:05:00|
|2      |13:05:00  |13:08:00|13:20:00|
|2      |13:20:00  |13:22:00|null    |
+-------+----------+--------+--------+
0 голосов
/ 01 мая 2018

Все, что вам не хватает, это ключевое слово Window и partitionBy вызов метода

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
dataset.withColumn("lead",lead(col("start_time"),1).over(Window.partitionBy("trackId").orderBy("start_time")))
...