У меня есть приложение, которое реализует API, как GET / themes / (строка: имя_позиции) / partitions / (int: partition_id) / messages? Offset = (int) [& count = (int)] изConfluent REST Proxy для Kafka.
Итак, у меня есть пул потребителей.Мой обработчик API довольно прост:
- Получить получателя из пула
consumer.assign(util.Arrays.asList(partition))
consumer.seek(partition, startOffset)
consumer.poll(Duration.ofMillis(300L))
consumer.unsubscribe()
- Возврат потребителя в пул
Итак, мое решение работает довольно хорошо в течение нескольких дней.Затем что-то происходит, и poll()
всегда возвращает пустой список записей.
Я могу это исправить, чтобы перезапустить свое приложение.Кроме того, я могу запустить новый экземпляр приложения, и этот может читать записи из Какфа, так что Кафка жив.