Я разобрался с решением с Flink
.
Я прочитал некоторую документацию по временному окну во Флинке, и на этой странице говорится о восходящей отметке времени в теме (это мой случай).
Итак, вот решение:
val inputStream: DataStream[Message] = env.addSource(kafkaConsumer)
val timedStream: DataStream[Message] = inputStream
.assignAscendingTimestamps(_.timestamp)
val timeWindow = timedStream.timeWindowAll(Time.minutes(1)).sum(1)
Подсчитывает все элементы в падающем окне за 1 минуту.
Для более конкретного решения и получения Counter("2018-05-17 00:00:00", 100)
мы должны расширить AllWindowFunction
:
class CustomWindowFunction extends AllWindowFunction[Message, Counter, TimeWindow] {
def apply(window: TimeWindow, input: Iterable[Message], out: Collector[Counter]): Unit = {
out.collect(
Counter(
new LocalDateTime(window.getStart),
input.size
)
)
}
}
А затем примените его к нашему timeStream:
val inputStream: DataStream[MyClass] = env.addSource(kafkaConsumer)
val timedStream: DataStream[MyClass] = inputStream
.assignAscendingTimestamps(_.timestamp)
val timeWindow = timedStream.timeWindowAll(Time.minutes(1)).apply(new CustomWindowFunction())
Если в нашей теме ввода у нас есть класс Message
, мы получаем класс Counter
в конце.
Это "лучшее" решение, которое я нашел на данный момент.