Если вы пытаетесь использовать новые пакеты из 100 сообщений из вашей группы потребителей, вам следует установить для max_bytes значение, которое для вашей модели данных всегда будет возвращать примерно 100 записей.Вы можете иметь более консервативную логику (получить меньше, а затем получить немного больше, пока не достигнете 100), или вы можете получить всегда больше, а затем игнорировать.В обоих случаях вам следует принять ручное управление смещением для вашей группы потребителей.
GET /consumers/testgroup/instances/my_consumer/records?max_bytes=300000
Если вы получаете более 100 сообщений и по какой-то причине игнорируете их, вы не получите их снова в этой группе потребителей, еслисмещение автоматической фиксации включено (это определяется при создании вашего потребителя).Вы, вероятно, не хотите, чтобы это произошло!
Если вы вручную фиксируете смещения, вы можете игнорировать все, что захотите, если затем зафиксируете правильные смещения, чтобы гарантировать, что вы не потеряете ни одно сообщение.Вы можете вручную зафиксировать свои смещения следующим образом:
POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json
{
"offsets": [
{
"topic": "test",
"partition": 0,
"offset": <calculated offset ending where you stopped consuming for this partition>
},
{
"topic": "test",
"partition": 1,
"offset": <calculated offset ending where you stopped consuming for this partition>
}
]
}
Если вы пытаетесь получить ровно первые 100 записей темы, вам нужно сбросить смещения групп потребителей для этой темы и каждого раздела передВы потребляете один раз.Вы можете сделать это следующим образом ( взято из слияния ):
POST /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Content-Type: application/vnd.kafka.v2+json
{
"offsets": [
{
"topic": "test",
"partition": 0,
"offset": 0
},
{
"topic": "test",
"partition": 1,
"offset": 0
}
]
}