А темы Кафки в основном материализуют бесконечный поток событий.
Так когда же остановиться при потреблении из темы?Как вы узнали, что достигли конца?Короткий ответ - нет!Теоретически, продюсер всегда может отправить новое сообщение в тему.
На практике есть несколько вещей, которые можно сделать, чтобы остановиться в конце, если не добавлено / добавлено несколько новых записей.
Используя endOffsets()
, вы можете найти текущее последнее смещение разделов.Как только потребитель достигнет этого смещения для всех разделов, которым он назначен, вы можете прекратить опрос (или обновить его и посмотреть, были ли отправлены новые сообщения).
Вы можете получить текущую позицию в каждом разделе, используяposition()
метод.При использовании каждая запись также выставляет свое собственное смещение через offset()
.Таким образом, вы можете использовать их для отслеживания прогресса в направлении конечных смещений.
Относительно вашего второго вопроса о poll()
ничего не возвращая при первом вызове.Ожидается, что в основном poll()
заставляет клиента работать и при первом вызове он инициирует соединение с кластером и запускает групповой протокол (который занимает несколько секунд), поэтому маловероятно, что сообщения будут получены до poll()
возвращается.