Потоки Кафки не перезапускаются с агрегированными значениями - PullRequest
0 голосов
/ 07 июня 2018

Я собираю значения в потоке следующим образом:

private KTable<String, StringAggregator> aggregate(KStream<String, String> inputStream) {
    return inputStream
            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
            .aggregate(
                    StringAggregator::new,
                    (k, v, a) -> {
                        a.add(v);
                        return a;
                    }, Materialized.<String, StringAggregator>as(Stores.persistentKeyValueStore("STATE_STORE"))
                            .withKeySerde(Serdes.String())
                            .withValueSerde(getValueSerde(StringAggregator.class)));
}

Обычно это работает отлично.Однако при перезапуске приложения агрегированное значение для ключей теряется.Кроме того, существует также вероятность того, что весь сервер будет прерван, и новый (с новой версией приложения потоков) появится в сети.Как я могу гарантировать, что значения для агрегации сохранятся?

1 Ответ

0 голосов
/ 08 июня 2018

Я закончил тем, что создал логику агрегации, которая использует результат агрегации, который был сохранен в теме кафки.Вот логика:

private KStream<String, StringAggregator> getAggregator(String topicName, 
                                                        KStream<String, String> input,
                                                        KTable<String, StringAggregator> aggregator) {

    return input
            .leftJoin(aggregator, (inputMessage, aggregatorMessage) -> { 
                if (aggregatorMessage == null) { 
                    aggregatorMessage = new StringAggregator(); 
                }
                aggregatorMessage.add(inputMessage);
                return aggregatorMessage; 
            }).peek((k, v) -> logger.info("Aggregated a join input for {}: {}, {} aggregated.", topicName, k, v.size()));
}

Вот логика для фактического построения потока.

String topicName = "input";
KStream<String, String> input = streamsBuilder.stream(topicName);
KTable<String, StringAggregator> aggregator = streamsBuilder.table("aggregate");
getAggregator(topicName, input, aggregator).to("aggregate");
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...