Вы можете использовать Kafka REST Proxy для получения последнего смещения, зафиксированного для определенного раздела . Согласно Confluent Docs ,
GET /consumers/(string: group_name)/instances/(string: instance)/offsets
Получить последние зафиксированные смещения для заданных разделов (будь то
коммит произошел по тому или иному процессу).
Обратите внимание, что этот запрос должен быть сделан для конкретного прокси REST
экземпляр, содержащий экземпляр потребителя.
Параметры:
Массив объектов:
- разделы - список разделов, чтобы найти последние зафиксированные смещения для
- разделы [i] .topic (строка) - название темы
- разделы [i] .partition (int) - идентификатор раздела
Ответ JSON Массив объектов:
- смещения - список подтвержденных смещений
- смещения [i] .topic (строка) - название темы, для которой было зафиксировано смещение
- смещения [i] .partition (int) - идентификатор раздела, для которого было зафиксировано смещение
- смещения [i] .offset (int) - фиксированное смещение
- смещения [i] .metadata (строка) - метаданные для зафиксированного смещения
Коды состояния:
- 404 Не найдено -
- Код ошибки 40402 - Раздел не найден
- Код ошибки 40403 - экземпляр получателя не найден
Пример запроса:
GET /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json
{
"partitions": [
{
"topic": "test",
"partition": 0
},
{
"topic": "test",
"partition": 1
}
]
}
Пример ответа:
HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json
{"offsets":
[
{
"topic": "test",
"partition": 0,
"offset": 21,
"metadata":""
},
{
"topic": "test",
"partition": 1,
"offset": 31,
"metadata":""
}
]
}