использование кучи потока kafka - PullRequest
2 голосов
/ 21 октября 2019

Я использую поток Кафки 2.1

Я пытаюсь объединить поток сообщений по их идентификатору. У нас примерно 20 сообщений с одинаковым идентификатором, созданных почти одновременно (максимум пара сотен мс между двумя сообщениями). Поэтому я использую окно сеанса с интервалом бездействия 500 мс и временем отсрочки 5 секунд.

Входящие записи имеют идентификатор в качестве ключа, а значение состоит из нескольких строковых полей + карты, которая можетсодержит от 0 до нескольких сотен записей (ключ - строка, значение - объект с одним строковым полем).

Вот код:


private final Duration INACTIVITY_GAP = Duration.ofMillis(500);
private final Duration GRACE_TIME = Duration.ofMillis(5000);

KStream<String, MyCustomMessage> source = streamsBuilder.stream("inputTopic", Consumed.with(Serdes.String(), myCustomSerde));

source
  .groupByKey(Grouped.with(Serdes.String(), myCustomSerde))                
  .windowedBy(SessionWindows.with(INACTIVITY_GAP).grace(GRACE_TIME))
  .aggregate(
     // initializer
     () -> {
         return new CustomAggMessage();
     },
     // aggregates records in same session
     (s, message, aggMessage) -> {
        // ...
         return aggMessage;
     },
     // merge sessions
     (s, aggMessage1, aggMessage2) -> {
         // ... merge
         return aggMessage2;
     },
     Materialized.with(Serdes.String(), myCustomAggSerde)
   )
   .suppress(Suppressed.untilWindowCloses(unbounded()))
   .selectKey((windowed, o) -> windowed.key());
   .toStream().to("outputTopic")

Я также попробовал другой Подавленный:.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(30), maxBytes(1_000_000_000L).emitEarlyWhenFull())). Это не помогло.

У меня есть пользовательская конфигурация rockdb:

public class CustomRocksDbConfig implements RocksDBConfigSetter {

    @Override
    public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
        BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
        tableConfig.setCacheIndexAndFilterBlocks(true);
        options.setTableFormatConfig(tableConfig);
    }
}

Входная тема имеет 32 раздела (около 10k мсг / сек), и мы запускаем 8 экземпляров с 4 потоковыми потоками каждый.

При запуске этого. Использование кучи очень велико (максимальная куча установлена ​​на 4G, на машине - 8G) и вызывает сбой и перезапуск приложения, поэтому задержка увеличивается.

Кто-то знает почему? Что я могу изменить, чтобы это работало? Является ли окно сеанса и его параметры правильным способом для достижения этого?

...