Я не очень знаком в Java, поэтому я попытался объяснить это в Scala.
Ключ, чтобы узнать час максимальных маршрутов в день на одного поставщика, заключается в подсчете (vendor, day, hour)
, затем агрегируйте по (vendor, day)
, чтобы рассчитать час, соответствующий максимальному центу каждой группы. day
и hour
каждой записи могут быть проанализированы как start_datetime
.
val df = spark.createDataset(Seq(
("17534","A","1/1/2013 12:00","1/1/2013 12:14",840),
("68346","A","1/1/2013 12:13","1/1/2013 12:18",300),
("09967","B","1/1/2013 12:34","1/1/2013 12:39",300),
("09967","B","1/1/2013 12:44","1/1/2013 12:51",420),
("09967","A","1/1/2013 12:54","1/1/2013 12:56",120)
)).toDF("id","vendor","start_datetime","end_datetime","trip_duration_in_sec")
df.rdd.map(t => {
val vendor = t(1)
val day = t(2).toString.split(" ")(0)
val hour = t(2).toString.split(" ")(1).split(":")(0)
((vendor, day, hour), 1)
})
// count by key
.aggregateByKey(0)((x: Int, y: Int) =>x+y, (x: Int, y: Int) =>x+y)
.map(t => {
val ((vendor, day, hour), cnt) = t;
((vendor, day), (hour, cnt))
})
// solve the max cnt by key (vendor, day)
.foldByKey(("", 0))((z: (String, Int), i: (String, Int)) => if (i._2 > z._2) i else z)
.foreach(t => println(s"${t._1._2} - Vendor ${t._1._1} has ${t._2._2} bus routes from ${t._2._1}:00 hour."))