• 1000 Поэтому я слежу за реализацией из учебника, подобной приведенной ниже
events
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofMinutes(1).grace(Duration.ZERO))
aggregate(...),Materialized.as("agg-metric")).withRetention(Duration.ofMinutes(5))
.suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
.toStream((key, value) -> key.key())
Кажется, все работает нормально, за исключением того, что объем моей памяти постоянно увеличивается. Я вижу, что моя кучная память стабильна, поэтому я предполагаю, что проблема связана с созданными экземплярами RocksDB.
У меня 10 разделов, и поскольку каждое окно (по умолчанию 3 сегмента на раздел) создает 3 сегмента , Я ожидаю, что всего будет 30 экземпляров RocksDB. Поскольку значения конфигурации по умолчанию для RocksDB довольно велики для моего приложения, я решил изменить конфигурацию по умолчанию для RocksDB и реализовать RocksDBConfigSetter в соответствии с приведенным ниже кодом, по сути пытаясь наложить ограничение на потребление памяти вне кучи.
private static final long BLOCK_CACHE_SIZE = 8 * 1024 * 1024L;
private static final long BLOCK_SIZE = 4096L;
private static final long WRITE_BUFFER_SIZE = 2 * 1024 * 1024L;
private static final int MAX_WRITE_BUFFERS = 2;
private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(BLOCK_CACHE_SIZE); // 8MB
private org.rocksdb.Filter filter = new org.rocksdb.BloomFilter();
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = new org.rocksdb.BlockBasedTableConfig();
tableConfig.setBlockCache(cache) // 8 MB
tableConfig.setBlockSize(BLOCK_SIZE); // 4 KB
tableConfig.setCacheIndexAndFilterBlocks(true);;
tableConfig.setPinTopLevelIndexAndFilter(true);
tableConfig.setFilter(new org.rocksdb.BloomFilter())
options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS); // 2 memtables
options.setWriteBufferSize(WRITE_BUFFER_SIZE); // 2 MB
options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
options.setTableFormatConfig(tableConfig);
}
На основе значений конфигурации выше и https://docs.confluent.io/current/streams/sizing.html Я ожидаю, что общая память, выделенная для RocksDB, будет
(write_buffer_size_mb * write_buffer_count) + block_cache_size_mb => 2*2 + 8 => 12MB
Поэтому для 30 экземпляров общая выделенная память вне кучи будет на 12 * 30 = 360 МБ .
Когда я пытаюсь запустить это приложение на виртуальной машине с памятью 2 ГБ, я назначаю 512 МБ для кучи приложения потоков kafka, поэтому на основе моих логика / понимание общей выделенной памяти должно выйти на плато при значении ниже 1 ГБ (512 + 360).
К сожалению, похоже, что это не так, поскольку моя память не перестает расти, хотя и медленно после точка, но стабильно почти ~ 2% в день, и неизбежно использование всей памяти виртуальной машины в какой-то момент и окончательный Ly убить процесс. Более того, я никогда не наблюдаю высвобождения памяти вне кучи, даже когда мой трафик c становится очень низким.
В результате мне интересно, что я делаю не так в таком простой и распространенный вариант использования. Я что-то упустил при расчете потребления памяти для моего приложения? Можно ли ограничить потребление памяти на моей виртуальной машине и какие настройки мне нужно изменить в моей конфигурации, чтобы ограничить выделение памяти для потокового приложения Kafka и экземпляров RocksDB?