Не читать следующее сообщение Кафки, пока текущее сообщение не будет обработано? - PullRequest
0 голосов
/ 14 сентября 2018

При использовании сообщений с использованием библиотеки Confluent Kafka C # я хочу видеть новое сообщение только после завершения обработки текущего (мне нужно снова прочитать то же сообщение, если что-то не получается). Другими словами, я не хочу, чтобы смещение изменялось, пока я явно не скажу, чтобы оно изменилось.

Чтобы попытаться добиться этого, я деактивирую автокоммит в конфиге (как в примерах):

{ "enable.auto.commit", false }
{ "auto.offset.reset", "smallest" }

Затем я закомментирую строку коммита:

while(true)
{
    if (!consumer.Consume(out Message<string, string> msg, TimeSpan.FromMilliseconds(100)))
    {
        continue;
    }

    //I thought by removing this line, I would keep getting the same message (until I've processed the message and commited the offset)
    //consumer.CommitAsync(msg).Result;
}

Я надеялся, что, не фиксируя, я продолжу получать одно и то же сообщение при вызове Consume (), но это не так. Даже если я не фиксирую, смещение постоянно меняется, и я получаю новые сообщения при каждом потреблении.

Пожалуйста, проясните мое очевидное недоразумение?

1 Ответ

0 голосов
/ 14 сентября 2018

Вызов CommitAsync сохраняет ваше смещение в теме смещения, сохраненной kafka. Таким образом, если бы вы избавились от своего Потребителя и создали новый, вы бы начали заново с последнего зафиксированного смещения, или если вы не зафиксировали смещение, вы бы начали со смещения 0.

Если не зафиксировать смещение, но продолжать использовать того же потребителя в вашем приложении, то смещение все равно будет увеличиваться и храниться в памяти потребителем.

Хотя это документация для потребительского класса Java в разделе Смещения и позиция потребителя , в нем более подробно описывается функция смещений и фиксации. KafkaConsumer Docs

...