Я использую Confluent.Kafka. NET клиент версии 1.3.0. Я слежу за документами :
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "server1, server2",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
GroupId = this.groupId,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = this.kafkaUsername,
SaslPassword = this.kafkaPassword,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
var cancellationToken = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationToken.Cancel();
};
consumer.Subscribe("my-topic");
while (true)
{
try
{
var consumerResult = consumer.Consume();
// process message
consumer.StoreOffset(consumerResult);
}
catch (ConsumeException e)
{
// log
}
catch (KafkaException e)
{
// log
}
catch (OperationCanceledException e)
{
// log
}
}
}
Проблема в том, что даже если я закомментирую строку consumer.StoreOffset(consumerResult);
, я получу следующее неиспользованное сообщение в следующий раз, когда я Потребляют , то есть смещение продолжает увеличиваться, что, по-видимому, не соответствует заявленной в документации документации, т.е. хотя бы одна поставка .
Даже если я установил EnableAutoCommit = false
и удалите 'EnableAutoOffsetStore = false' из конфигурации и замените consumer.StoreOffset(consumerResult)
на consumer.Commit()
, я все еще вижу то же поведение, то есть даже если я закомментирую Commit
, я все равно продолжаю получать следующие неиспользованные сообщения.
Я чувствую, что здесь упускаю что-то фундаментальное, но не могу понять, что. Любая помощь приветствуется!