Если у вас есть поток событий и вы хотите объединить его в список на основе ключа, вы можете начать с базового примера, как показано ниже:
KTable<String, List<Object>> aggregatedMetrics = eventStream
.selectKey((k,v)-> k // Pick your key here)
.groupByKey()
.aggregate(() -> ArrayList::new,
(key, value, aggregate) -> aggregate.add(value), arrayListSerde());