Насколько я понимаю, вы пытаетесь добиться, чтобы в вашем приложении была построена карта на основе значений, которые уже находятся в определенной теме.
Для этой задачи вместо ручного опроса темы, вы можете использовать Ktable в DSL Kafka Streams, который автоматически создаст читаемое хранилище значений ключей, которое будет отказоустойчивым, с поддержкой репликации и автоматически заполнится новыми значениями.
Вы можете сделать это простовызывая groupByKey в потоке, а затем используя агрегат.
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> myKStream = builder.stream(Serdes.String(), Serdes.Long(), "topic_name");
KTable<String, Long> totalCount = myKStream.groupByKey().aggregate(this::initializer, this::aggregator);
(Фактический код может отличаться в зависимости от версии kafka, ваших настроек и т. д.)
Подробнее о концепциях Kafka Stream здесь
Затем я перебираю потребителя (метод опроса), чтобы получить все сообщения и остановиться, когда записи потребителя станут пустыми
Kafka - это платформа потоковой передачи сообщений.Любые передаваемые вами данные постоянно обновляются, и вам, вероятно, не следует использовать их так, как вы ожидаете, что потребление прекратится после определенного количества сообщений.Как вы будете обрабатывать, если новое сообщение приходит после того, как вы остановите потребителя?
Кроме того, причина, по которой вы получаете нулевые записи, возможно, связана с записями, находящимися в разных разделах и т. Д.
ЧтоВаш конкретный вариант использования здесь ?, Возможно, есть хороший способ сделать это с помощью самой семантики Kafka.