Вы должны быть в состоянии решить проблему, используя AggregateFunction
:
source
.keyBy(0)
.timeWindow(Time.seconds(10L))
.aggregate(new AggregateFunction[(String, String), (String, Map[String, Int]), (String, Map[String, Int])] {
override def createAccumulator(): (String, Map[String, Int]) = ("", Map())
override def add(value: (String, String), accumulator: (String, Map[String, Int])): (String, Map[String, Int]) = {
val counter = accumulator._2.getOrElse(value._2, 0)
(value._1, accumulator._2 + (value._2 -> (counter + 1)))
}
override def getResult(accumulator: (String, Map[String, Int])): (String, Map[String, Int]) = accumulator
override def merge(a: (String, Map[String, Int]), b: (String, Map[String, Int])): (String, Map[String, Int]) = {
(a._1, (a._2.keySet ++ b._2.keySet) map (k => k -> (a._2.getOrElse(k, 0) + b._2.getOrElse(k, 0))) toMap)
}
})