Мне трудно понять, как работает Windowing в Kafka Streams.Результаты не соответствуют тому, что я прочитал и понял до сих пор.
Я создал поток KSQL с вспомогательной темой.один из 'столбцов' в операторе KSQL SELECT был определен как TIMESTAMP для темы.
CREATE STREAM my_stream WITH (KAFKA_topic='my-stream-topic', VALUE_FORMAT='json', TIMESTAMP='recorded_timestamp') AS select <select list> PARTITION BY PARTITION_KEY;
Записи в my-stream-topic группируются по ключу (PARTITION_KEY) и отображаются в окне с переключениемwindow
val dataWindowed: TimeWindowedKStream[String, TopicValue] = builder.stream('my-stream-topic', consumed)
.groupByKey(Serialized.`with`(Serdes.String(), valueSerde))
.windowedBy(TimeWindows.`of`(TimeUnit.MINUTES.toMillis(5)).advanceBy(TimeUnit.MINUTES.toMillis(1)).until(TimeUnit.MINUTES.toMillis(5)))
Записи агрегируются через
val dataAgg: KTable[Windowed[String], ValueStats] = dataWindowed
.aggregate(
new Initializer[TopicStats] {<code omitted>}},
new Aggregator[String, TopicValue, TopicStats] {<code omitted>}},
Materialized.`as`[String, TopicStats, WindowStore[Bytes, Array[Byte]]]("time-windowed-aggregated-stream-store")
.withValueSerde(new JSONSerde[TopicStats])
)
val topicStats: KStream[String, TopicValueStats] = dataAgg
.toStream()
.map( <code omitted for brevity>)
Затем я печатаю на консоль через
dataAgg.print()
topicStats.print()
Первое в группе окно переводится в 7:00- 7: 05
Когда я проверяю записи в my-stream-topic через консольного потребителя, я вижу, что есть 2 записи, которые должны попадать в указанное выше окно.Однако агрегатор выбирает только 1 из них.
Я думал, что KTable с окном dataAgg будет содержать 1 запись для сгруппированного ключа, но агрегат использовал бы 2 записи для вычисления агрегата.Напечатанное совокупное значение неверно.
Чего мне не хватает?