Я использовал хранилище состояний в памяти в потоковом приложении. У меня 2 государственных магазина. Общее количество данных 3,3 млн. В каждом государственном магазине. Всего 2 гос магазина размером 8,3гб. Количество разделов: 90. Коэффициент репликации: 1. Размер посредника: 3. У меня есть 10 экземпляров для этого потока, поэтому 1 экземпляр соответствует 9 разделам.
В соответствии с примером сценария 3 , для экземпляра использовал 830 мб оперативной памяти. но это не случилось со мной.
Проблема:
Объем используемой памяти на экземпляр составляет ~ 8,5 ГБ. Увеличение со временем и перезапуск экземпляров. Я использовал Processor API.
Что я делаю не так?
Код:
Stores
.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(STATISTIC_HISTORICAL_DAILY_STORE),
new EventKeySerde(),
new EventSerde(Map.class, String.class, UtilizationValue.class))
.withCachingEnabled()
.withLoggingEnabled(new HashMap<>()),
Stores
.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(STATISTIC_HISTORICAL_STORE),
new EventKeySerde(),
new EventSerde(Map.class, String.class, UtilizationValue.class))
.withCachingEnabled()
.withLoggingEnabled(new HashMap<>())
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationProperties.getProperty(StreamsConfig.APPLICATION_ID_CONFIG, defaultApplicationId));
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, defaultKeySerdeClassName);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, defaultValueSerdeClassName);
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip");
// add from property files.
"[max.poll.interval.ms]": 300000
"[num.standby.replicas]": 1
@Override
public void init(ProcessorContext context) {
this.context = context;
this.statisticHistoricalDailyStore = (KeyValueStore<EventKey, Event<Map<String, UtilizationValue>>>) context.getStateStore(STATISTIC_HISTORICAL_DAILY_STORE);
this.statisticHistoricalStore = (KeyValueStore<EventKey, Event<Map<String, UtilizationValue>>>) context.getStateStore(STATISTIC_HISTORICAL_STORE);
attributeDefinitions = loadAttributeDefinitions();
context.schedule(Duration.ofMinutes(5), PunctuationType.WALL_CLOCK_TIME, l -> {
logger.info("Flush state stores...");
this.statisticHistoricalDailyStore.flush();
this.statisticHistoricalStore.flush();
});
}
//In the process method, get and put operations are performed.