У нас есть топология потоков, которая будет работать на нескольких машинах. Мы храним результаты агрегации с временными окнами в государственных хранилищах.
Так как хранилища состояния хранят локальные данные, я думаю, что агрегацию следует проводить по другой теме для общей агрегации.
Но похоже, что я что-то упускаю, потому что ни один из примеров не выполняет общее агрегирование на другом KStream или Процессоре.
Нужно ли нам использовать логику groupBy для хранения общей агрегации, или использовать GlobalKtable, или просто где-нибудь реализовать наш собственный код слияния?
Какова правильная архитектура для этого?
В приведенном ниже коде я попытался сгруппировать все сообщения, поступающие на процессор, с постоянным ключом, чтобы сохранить общую агрегацию только на одной машине, но, по-моему, он потерял бы параллелизм, который предоставляет Kafka.
dashboardItemProcessor = streamsBuilder.stream("Topic25", Consumed.with(Serdes.String(), eventSerde))
.filter((key, event) -> event != null && event.getClientCreationDate() != null);
dashboardItemProcessor.map((key, event) -> KeyValue.pair(key, event.getClientCreationDate().toInstant().toEpochMilli()))
.groupBy((key, event) -> "count", Serialized.with(Serdes.String(), Serdes.Long()))
.windowedBy(timeWindow)
.count(Materialized.as(dashboardItemUtil.getStoreName(itemId, timeWindow)));