Проблема использования памяти с Kafka In-memory State store - PullRequest
1 голос
/ 07 января 2020

Я использовал хранилище состояний в памяти в потоковом приложении. У меня 2 государственных магазина. Общее количество данных 3,3 млн. В каждом государственном магазине. Всего 2 гос магазина размером 8,3гб. Количество разделов: 90. Коэффициент репликации: 1. Размер посредника: 3. У меня есть 10 экземпляров для этого потока, поэтому 1 экземпляр соответствует 9 разделам.

В соответствии с примером сценария 3 , для экземпляра использовал 830 мб оперативной памяти. но это не случилось со мной.

Проблема:

Объем используемой памяти на экземпляр составляет ~ 8,5 ГБ. Увеличение со временем и перезапуск экземпляров. Я использовал Processor API.

Что я делаю не так?

Код:

Stores
    .keyValueStoreBuilder(
            Stores.inMemoryKeyValueStore(STATISTIC_HISTORICAL_DAILY_STORE),
            new EventKeySerde(),
            new EventSerde(Map.class, String.class, UtilizationValue.class))
    .withCachingEnabled()
    .withLoggingEnabled(new HashMap<>()),
Stores
    .keyValueStoreBuilder(
            Stores.inMemoryKeyValueStore(STATISTIC_HISTORICAL_STORE),
            new EventKeySerde(),
            new EventSerde(Map.class, String.class, UtilizationValue.class))
    .withCachingEnabled()
    .withLoggingEnabled(new HashMap<>())

    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationProperties.getProperty(StreamsConfig.APPLICATION_ID_CONFIG, defaultApplicationId));
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, defaultKeySerdeClassName);
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, defaultValueSerdeClassName);
    streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
    streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip");

    // add from property files.
    "[max.poll.interval.ms]": 300000
    "[num.standby.replicas]": 1

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.statisticHistoricalDailyStore = (KeyValueStore<EventKey, Event<Map<String, UtilizationValue>>>) context.getStateStore(STATISTIC_HISTORICAL_DAILY_STORE);
        this.statisticHistoricalStore = (KeyValueStore<EventKey, Event<Map<String, UtilizationValue>>>) context.getStateStore(STATISTIC_HISTORICAL_STORE);
        attributeDefinitions = loadAttributeDefinitions();

        context.schedule(Duration.ofMinutes(5), PunctuationType.WALL_CLOCK_TIME, l -> {
            logger.info("Flush state stores...");
            this.statisticHistoricalDailyStore.flush();
            this.statisticHistoricalStore.flush();
        });
    }

   //In the process method, get and put operations are performed.
...