C# медленное потребление кафки - PullRequest
0 голосов
/ 04 мая 2020

Я пытаюсь выучить kafka и реализовать его в моем C# API-сервисе.

Использование Kafka 2.12-2.5.0 на windows сервере и. Net core 3.1.102

У меня есть свой класс, который реализует класс BackgroundService примерно так, и все его потребление находится в этом методе, который вызывается методом ExecuteAsyn c.

Когда новое сообщение создается в topi c it потребляется просто отлично. Проблема заключается в том, что оно не будет использовать другое сообщение в течение примерно 10 секунд.

Если я пытаюсь установить смещение как можно раньше, оно потребляет только первое и повторяется каждые 10 секунд.

Что такое Я делаю не так?

private async Task StartConsumer(CancellationToken stoppingToken)
{
    _logger.LogDebug("Kafka consumer starting");
    while (!stoppingToken.IsCancellationRequested)
    {
        using (var consumer = new ConsumerBuilder<string, string>(_consumerConfig).Build())
        {
            consumer.Subscribe(_topic);
            var consumeResult = consumer.Consume().Message;
            if (consumeResult.Value != null)
            {
                _logger.LogDebug($"Consumed message: {consumeResult.Value} with key: {consumeResult.Key}");
            }
        }
    }
}

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
    Task.Run(() => StartConsumer(stoppingToken));
    return Task.CompletedTask;
}

Конфигурация Kafka для потребителей:

"Consumer": {
  "BootstrapServers": "192.168.1.4:9092",
  "AutoOffsetReset": "earliest",
  "GroupId": "CsharpApp",
  "EnableAutoCommit": "true",
  "EnableAutoOffsetStore": "true",
  "Topic": "myTopic"
}

1 Ответ

0 голосов
/ 04 мая 2020

Хорошо, я понял, в чем проблема.

Я должен был иметь некоторое время внутри использования, а не наоборот.

...