Я использую поток Кафки 2.1
Я пытаюсь объединить поток сообщений по их идентификатору. У нас примерно 20 сообщений с одинаковым идентификатором, созданных почти одновременно (максимум пара сотен мс между двумя сообщениями). Поэтому я использую окно сеанса с интервалом бездействия 500 мс и временем отсрочки 5 секунд.
Входящие записи имеют идентификатор в качестве ключа, а значение состоит из нескольких строковых полей + карты, которая можетсодержит от 0 до нескольких сотен записей (ключ - строка, значение - объект с одним строковым полем).
Вот код:
private final Duration INACTIVITY_GAP = Duration.ofMillis(500);
private final Duration GRACE_TIME = Duration.ofMillis(5000);
KStream<String, MyCustomMessage> source = streamsBuilder.stream("inputTopic", Consumed.with(Serdes.String(), myCustomSerde));
source
.groupByKey(Grouped.with(Serdes.String(), myCustomSerde))
.windowedBy(SessionWindows.with(INACTIVITY_GAP).grace(GRACE_TIME))
.aggregate(
// initializer
() -> {
return new CustomAggMessage();
},
// aggregates records in same session
(s, message, aggMessage) -> {
// ...
return aggMessage;
},
// merge sessions
(s, aggMessage1, aggMessage2) -> {
// ... merge
return aggMessage2;
},
Materialized.with(Serdes.String(), myCustomAggSerde)
)
.suppress(Suppressed.untilWindowCloses(unbounded()))
.selectKey((windowed, o) -> windowed.key());
.toStream().to("outputTopic")
Я также попробовал другой Подавленный:.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(30), maxBytes(1_000_000_000L).emitEarlyWhenFull()))
. Это не помогло.
У меня есть пользовательская конфигурация rockdb:
public class CustomRocksDbConfig implements RocksDBConfigSetter {
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
}
}
Входная тема имеет 32 раздела (около 10k мсг / сек), и мы запускаем 8 экземпляров с 4 потоковыми потоками каждый.
При запуске этого. Использование кучи очень велико (максимальная куча установлена на 4G, на машине - 8G) и вызывает сбой и перезапуск приложения, поэтому задержка увеличивается.
Кто-то знает почему? Что я могу изменить, чтобы это работало? Является ли окно сеанса и его параметры правильным способом для достижения этого?