Я использую агрегацию KafkaStreams с пользовательским TimestampExtractor.Когда я перезапускаю приложение, агрегация начинается с начала.
StreamsBuilder builder = new StreamsBuilder()
KStream stream = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
KTable table = stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(aggregationMinutes)))
.aggregate(
{ new AggregatorModel() },
{ key, value, aggregate ->
return new aggregation.add(value)
}
)
.toStream()
.map({ k, v ->
new KeyValue<>(k.window().end(), v)
})
.to('output')
def config = new Properties()
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerHost)
config.put(ConsumerConfig.GROUP_ID_CONFIG, 'group-id')
config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName())
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, TimeUnit.SECONDS.toMillis(60))
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
kafkaStreams.start()
Что я делаю не так?