Весенний кафка сбрасывает агрегацию после перезапуска приложения - PullRequest
0 голосов
/ 23 октября 2018

Я использую агрегацию KafkaStreams с пользовательским TimestampExtractor.Когда я перезапускаю приложение, агрегация начинается с начала.

    StreamsBuilder builder = new StreamsBuilder()
    KStream stream = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))

    KTable table = stream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(aggregationMinutes)))
            .aggregate(
            { new AggregatorModel() },
            { key, value, aggregate ->


                return new aggregation.add(value)
            }
    )
            .toStream()
            .map({ k, v ->
        new KeyValue<>(k.window().end(), v)
    })
            .to('output')

    def config = new Properties()
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerHost)
    config.put(ConsumerConfig.GROUP_ID_CONFIG, 'group-id')
    config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName())
    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, TimeUnit.SECONDS.toMillis(60))

    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config)
    kafkaStreams.start()

Что я делаю не так?

1 Ответ

0 голосов
/ 02 ноября 2018

Я нашел в чем проблема.Я агрегировал данные более 3 дней назад, но параметр "windowstore.changelog.additional.retention.ms" по умолчанию установлен 24 часа.И моя агрегация начинается с начала.Когда я собираю данные за текущий день, все идет хорошо.

...