Как ограничить количество записей в Кафке-потребителе - PullRequest
0 голосов
/ 03 декабря 2018

Я использую конфлюент Kafka-rest продукт, чтобы использовать записи из темы.Мое намерение состоит в том, чтобы потреблять только первые 100 записей из темы.Я использую следующий API REST для получения записей

GET /consumers/testgroup/instances/my_consumer/records

Как этого добиться?Есть идеи?

Ответы [ 3 ]

0 голосов
/ 03 декабря 2018

Насколько я знаю, в настоящее время это невозможно.Как упоминалось в другом ответе, вы можете указать максимальный размер в байтах (хотя в некоторых случаях это может быть проигнорировано посредниками), но вы не можете указать желаемое количество сообщений.

Однако такая функцияможет быть легко реализовано в вашем клиентском коде.Вы можете угадать приблизительный размер, запросить REST API и посмотреть, сколько сообщений вы получили.Если оно меньше 100, запросите его снова, чтобы получить следующие несколько сообщений, пока не достигнете 100.

0 голосов
/ 13 июня 2019

Для настройки KafkaConsumer можно использовать свойство ConsumerConfig.MAX_POLL_RECORDS_CONFIG.Пожалуйста, смотрите документ

0 голосов
/ 03 декабря 2018

Если вы пытаетесь использовать новые пакеты из 100 сообщений из вашей группы потребителей, вам следует установить для max_bytes значение, которое для вашей модели данных всегда будет возвращать примерно 100 записей.Вы можете иметь более консервативную логику (получить меньше, а затем получить немного больше, пока не достигнете 100), или вы можете получить всегда больше, а затем игнорировать.В обоих случаях вам следует принять ручное управление смещением для вашей группы потребителей.

GET /consumers/testgroup/instances/my_consumer/records?max_bytes=300000

Если вы получаете более 100 сообщений и по какой-то причине игнорируете их, вы не получите их снова в этой группе потребителей, еслисмещение автоматической фиксации включено (это определяется при создании вашего потребителя).Вы, вероятно, не хотите, чтобы это произошло!

Если вы вручную фиксируете смещения, вы можете игнорировать все, что захотите, если затем зафиксируете правильные смещения, чтобы гарантировать, что вы не потеряете ни одно сообщение.Вы можете вручную зафиксировать свои смещения следующим образом:

POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

{
  "offsets": [
    {
      "topic": "test",
      "partition": 0,
      "offset": <calculated offset ending where you stopped consuming for this partition>
    },
    {
      "topic": "test",
      "partition": 1,
      "offset": <calculated offset ending where you stopped consuming for this partition>
    }
  ]
}

Если вы пытаетесь получить ровно первые 100 записей темы, вам нужно сбросить смещения групп потребителей для этой темы и каждого раздела передВы потребляете один раз.Вы можете сделать это следующим образом ( взято из слияния ):

POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json

{
  "offsets": [
    {
      "topic": "test",
      "partition": 0,
      "offset": 0
    },
    {
      "topic": "test",
      "partition": 1,
      "offset": 0
    }
  ]
}
...