Снова использовать то же сообщение, если обработка сообщения не удалась - PullRequest
10 голосов
/ 08 марта 2020

Я использую Confluent.Kafka. NET клиент версии 1.3.0. Я слежу за документами :

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "server1, server2",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = true,
    EnableAutoOffsetStore = false,
    GroupId = this.groupId,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = this.kafkaUsername,
    SaslPassword = this.kafkaPassword,
};

using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
    var cancellationToken = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cancellationToken.Cancel();
    };

    consumer.Subscribe("my-topic");
    while (true)
    {
        try
        {
            var consumerResult = consumer.Consume();
            // process message
            consumer.StoreOffset(consumerResult);
        }
        catch (ConsumeException e)
        {
            // log
        }
        catch (KafkaException e)
        {
            // log
        }
        catch (OperationCanceledException e)
        {
            // log
        }
    }
}

Проблема в том, что даже если я закомментирую строку consumer.StoreOffset(consumerResult);, я получу следующее неиспользованное сообщение в следующий раз, когда я Потребляют , то есть смещение продолжает увеличиваться, что, по-видимому, не соответствует заявленной в документации документации, т.е. хотя бы одна поставка .

Даже если я установил EnableAutoCommit = false и удалите 'EnableAutoOffsetStore = false' из конфигурации и замените consumer.StoreOffset(consumerResult) на consumer.Commit(), я все еще вижу то же поведение, то есть даже если я закомментирую Commit, я все равно продолжаю получать следующие неиспользованные сообщения.

Я чувствую, что здесь упускаю что-то фундаментальное, но не могу понять, что. Любая помощь приветствуется!

Ответы [ 2 ]

0 голосов
/ 16 марта 2020

Возможно, вы захотите иметь логи повторной попытки c для обработки каждого из ваших сообщений в течение фиксированного числа раз, например, скажем 5. Если это не удастся в течение этих 5 попыток, вы можете добавить это сообщение. другой топи c для обработки всех неудачных сообщений, которые имеют приоритет над вашей фактической топи c. Или вы можете захотеть добавить сообщение с ошибкой в ​​ту же топи c, чтобы оно было забрано позже, когда все эти другие сообщения будут использованы.

Если обработка любого сообщения была успешной в течение этих 5 попыток , вы можете перейти к следующему сообщению в очереди.

0 голосов
/ 09 марта 2020

Извините, я не могу добавить комментарий. Потребитель Kafka потребляет сообщение в пакетах, , поэтому, возможно, вы все еще итерируете пакет, предварительно выбранный фоновым потоком .

Вы можете проверить, действительно ли ваш потребитель фиксирует смещение или нет, используя kafka util kafka-consumer-groups.sh

kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group consumer_group  --describe
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...