Проблемы с распределением памяти Kafka Streams с RocksDB - PullRequest
3 голосов
/ 06 мая 2020
• 1000 Поэтому я слежу за реализацией из учебника, подобной приведенной ниже
events
  .groupByKey()
  .windowedBy(
    TimeWindows.of(Duration.ofMinutes(1).grace(Duration.ZERO))
    aggregate(...),Materialized.as("agg-metric")).withRetention(Duration.ofMinutes(5))

  .suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
  .toStream((key, value) -> key.key())

Кажется, все работает нормально, за исключением того, что объем моей памяти постоянно увеличивается. Я вижу, что моя кучная память стабильна, поэтому я предполагаю, что проблема связана с созданными экземплярами RocksDB.

У меня 10 разделов, и поскольку каждое окно (по умолчанию 3 сегмента на раздел) создает 3 сегмента , Я ожидаю, что всего будет 30 экземпляров RocksDB. Поскольку значения конфигурации по умолчанию для RocksDB довольно велики для моего приложения, я решил изменить конфигурацию по умолчанию для RocksDB и реализовать RocksDBConfigSetter в соответствии с приведенным ниже кодом, по сути пытаясь наложить ограничение на потребление памяти вне кучи.

private static final long BLOCK_CACHE_SIZE = 8 * 1024 * 1024L;
private static final long BLOCK_SIZE = 4096L;
private static final long WRITE_BUFFER_SIZE = 2 * 1024 * 1024L;
private static final int MAX_WRITE_BUFFERS = 2;

private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(BLOCK_CACHE_SIZE); // 8MB
private org.rocksdb.Filter filter = new org.rocksdb.BloomFilter();

@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
     BlockBasedTableConfig tableConfig = new org.rocksdb.BlockBasedTableConfig();
    tableConfig.setBlockCache(cache) // 8 MB
    tableConfig.setBlockSize(BLOCK_SIZE); // 4 KB
    tableConfig.setCacheIndexAndFilterBlocks(true);;
    tableConfig.setPinTopLevelIndexAndFilter(true);

    tableConfig.setFilter(new org.rocksdb.BloomFilter())
    options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS); // 2 memtables
    options.setWriteBufferSize(WRITE_BUFFER_SIZE);  // 2 MB
    options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);

    options.setTableFormatConfig(tableConfig);
}

На основе значений конфигурации выше и https://docs.confluent.io/current/streams/sizing.html Я ожидаю, что общая память, выделенная для RocksDB, будет

 (write_buffer_size_mb * write_buffer_count) + block_cache_size_mb => 2*2 + 8 => 12MB

Поэтому для 30 экземпляров общая выделенная память вне кучи будет на 12 * 30 = 360 МБ .

Когда я пытаюсь запустить это приложение на виртуальной машине с памятью 2 ГБ, я назначаю 512 МБ для кучи приложения потоков kafka, поэтому на основе моих логика / понимание общей выделенной памяти должно выйти на плато при значении ниже 1 ГБ (512 + 360).

К сожалению, похоже, что это не так, поскольку моя память не перестает расти, хотя и медленно после точка, но стабильно почти ~ 2% в день, и неизбежно использование всей памяти виртуальной машины в какой-то момент и окончательный Ly убить процесс. Более того, я никогда не наблюдаю высвобождения памяти вне кучи, даже когда мой трафик c становится очень низким.

В результате мне интересно, что я делаю не так в таком простой и распространенный вариант использования. Я что-то упустил при расчете потребления памяти для моего приложения? Можно ли ограничить потребление памяти на моей виртуальной машине и какие настройки мне нужно изменить в моей конфигурации, чтобы ограничить выделение памяти для потокового приложения Kafka и экземпляров RocksDB?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...