Я пытался использовать функцию ведущего окна. Я упорядочил фрейм данных, разделив их на станции и даты и упорядочив их по часам, и рассчитал разницу с предыдущим часом. Если мы рассматриваем 4 часа подряд, в колонке разностей должно быть три единицы. Чтобы выяснить это, я собрал все различия, основанные на станции и дате, и проверил, содержит ли он «1 1 1». Код для того же самого показан ниже. Я надеюсь, что это полезно.
//Creating Test Data
val df = Seq(("Roma",2.2,"2018-10-02",1 )
, ("Roma",1.5,"2018-10-02",2 )
, ("Roma",1.4,"2018-10-02",3 )
, ("Roma",1.4,"2018-10-02",4 )
, ("Milano",0.6,"2018-11-02",12 )
, ("Milano",1.0,"2018-11-02",13 )
, ("Napoli",0.3,"2018-12-02",20 )
, ("Napoli",0.0,"2018-12-02",21 )
, ("Napoli",1.8,"2018-12-02",4 )
, ("Napoli",2.0,"2018-12-03",5 )
, ("Napoli",1.8,"2018-12-03",6))
.toDF("station", "temp", "dateS", "hour")
val filterDF = df.withColumn("hour_lead", lead($"hour", 1)
.over(Window.partitionBy("station","dateS")
.orderBy(col("hour")))
.filter($"hour_lead".isNotNull)
.withColumn("hour_diff", $"hour_lead" - $"hour")
.groupBy("station","dateS")
.agg(collect_list($"hour_diff".cast("string")).as("hour_diff_list"))
.withColumn("hour_diff_list_str",
concat(lit(" "),
concat_ws(" ", $"hour_diff_list"),
lit(" ")))
.filter($"hour_diff_list_str".contains(" 1 1 1 "))
filterDF.show(false)
+-------+----------+--------------+------------------+
|station|dateS |hour_diff_list|hour_diff_list_str|
+-------+----------+--------------+------------------+
|Roma |2018-10-02|[1, 1, 1] | 1 1 1 |
+-------+----------+--------------+------------------+