Я пытаюсь выучить kafka и реализовать его в моем C# API-сервисе.
Использование Kafka 2.12-2.5.0 на windows сервере и. Net core 3.1.102
У меня есть свой класс, который реализует класс BackgroundService примерно так, и все его потребление находится в этом методе, который вызывается методом ExecuteAsyn c.
Когда новое сообщение создается в topi c it потребляется просто отлично. Проблема заключается в том, что оно не будет использовать другое сообщение в течение примерно 10 секунд.
Если я пытаюсь установить смещение как можно раньше, оно потребляет только первое и повторяется каждые 10 секунд.
Что такое Я делаю не так?
private async Task StartConsumer(CancellationToken stoppingToken)
{
_logger.LogDebug("Kafka consumer starting");
while (!stoppingToken.IsCancellationRequested)
{
using (var consumer = new ConsumerBuilder<string, string>(_consumerConfig).Build())
{
consumer.Subscribe(_topic);
var consumeResult = consumer.Consume().Message;
if (consumeResult.Value != null)
{
_logger.LogDebug($"Consumed message: {consumeResult.Value} with key: {consumeResult.Key}");
}
}
}
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
Task.Run(() => StartConsumer(stoppingToken));
return Task.CompletedTask;
}
Конфигурация Kafka для потребителей:
"Consumer": {
"BootstrapServers": "192.168.1.4:9092",
"AutoOffsetReset": "earliest",
"GroupId": "CsharpApp",
"EnableAutoCommit": "true",
"EnableAutoOffsetStore": "true",
"Topic": "myTopic"
}