как получить доступ к последнему смещению темы в оставшейся прокси кафки, чтобы рассчитать лаг - PullRequest
0 голосов
/ 11 июля 2019

В доверенном прокси-кафке остатка мы можем получить последнее зафиксированное смещение определенной группы потребителей, но как мы можем получить последнее смещение темы для вычисления лага.

1 Ответ

0 голосов
/ 11 июля 2019

Вы можете использовать Kafka REST Proxy для получения последнего смещения, зафиксированного для определенного раздела . Согласно Confluent Docs ,

GET /consumers/(string: group_name)/instances/(string: instance)/offsets

Получить последние зафиксированные смещения для заданных разделов (будь то коммит произошел по тому или иному процессу).

Обратите внимание, что этот запрос должен быть сделан для конкретного прокси REST экземпляр, содержащий экземпляр потребителя.

Параметры:

  • имя_группы (строка) - имя группы потребителей

  • экземпляр (строка) - идентификатор экземпляра потребителя Запрос JSON

Массив объектов:

  • разделы - список разделов, чтобы найти последние зафиксированные смещения для
  • разделы [i] .topic (строка) - название темы
  • разделы [i] .partition (int) - идентификатор раздела

Ответ JSON Массив объектов:

  • смещения - список подтвержденных смещений
  • смещения [i] .topic (строка) - название темы, для которой было зафиксировано смещение
  • смещения [i] .partition (int) - идентификатор раздела, для которого было зафиксировано смещение
  • смещения [i] .offset (int) - фиксированное смещение
  • смещения [i] .metadata (строка) - метаданные для зафиксированного смещения

Коды состояния:

  • 404 Не найдено -
  • Код ошибки 40402 - Раздел не найден
  • Код ошибки 40403 - экземпляр получателя не найден

Пример запроса:

GET /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }

  ]
}

Пример ответа:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{"offsets":
 [
  {
    "topic": "test",
    "partition": 0,
    "offset": 21,
    "metadata":""
  },
  {
    "topic": "test",
    "partition": 1,
    "offset": 31,
    "metadata":""
  }
 ]
}
...