Я реализовал потребителя kafka как консольное приложение, используя BackgroundService в .NET Core 2.2. Я использую confluent-kafka-dotnet v 1.0.1.1 в качестве клиента для Apache Kafka. У меня есть сомнения по поводу того, как я обрабатываю каждое сообщение.
1) Поскольку обработка каждого сообщения может занимать некоторое время (до 24 часов), я запускаю новую задачу для каждого сообщения, чтобы не блокировать потребителя от получения новых сообщений. Я думаю, что если у меня слишком много сообщений, создание новой Задачи каждый раз не является правильным способом. Как правильно обрабатывать каждое сообщение? Можно ли создать вид динамического фонового сервиса для каждого сообщения?
2) Если сообщение уже обрабатывается, но происходит сбой приложения или происходит перебалансировка, я в конечном итоге потребляю и обрабатываю одно и то же сообщение более одного раза. Должен ли я зафиксировать смещение автоматически (или сразу после его использования) и сохранить состояние сообщения (или задачи) где-нибудь (например, в базе данных)?
Я знаю, что есть Hangfire, но я не уверен, нужно ли мне его использовать. Если мой нынешний подход совершенно неверен, пожалуйста, дайте мне несколько советов. Любой совет / помощь будет принята с благодарностью.
Вот реализация ConsumerService:
public class ConsumerService : BackgroundService
{
private readonly IConfiguration _config;
private readonly IElasticLogger _logger;
private readonly ConsumerConfig _consumerConfig;
private readonly string[] _topics;
private readonly double _maxNumAttempts;
private readonly double _retryIntervalInSec;
public ConsumerService(IConfiguration config, IElasticLogger logger)
{
_config = config;
_logger = logger;
_consumerConfig = new ConsumerConfig
{
BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
GroupId = _config.GetValue<string>("Kafka:GroupId"),
EnableAutoCommit = _config.GetValue<bool>("Kafka:Consumer:EnableAutoCommit"),
AutoOffsetReset = (AutoOffsetReset)_config.GetValue<int>("Kafka:Consumer:AutoOffsetReset")
};
_topics = _config.GetValue<string>("Kafka:Consumer:Topics").Split(',');
_maxNumAttempts = _config.GetValue<double>("App:MaxNumAttempts");
_retryIntervalInSec = _config.GetValue<double>("App:RetryIntervalInSec");
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
Console.WriteLine("!!! CONSUMER STARTED !!!\n");
// Starting a new Task here because Consume() method is synchronous
var task = Task.Run(() => ProcessQueue(stoppingToken), stoppingToken);
return task;
}
private void ProcessQueue(CancellationToken stoppingToken)
{
using (var consumer = new ConsumerBuilder<Ignore, Request>(_consumerConfig).SetValueDeserializer(new MessageDeserializer()).Build())
{
consumer.Subscribe(_topics);
try
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(stoppingToken);
// Don't want to block consume loop, so starting new Task for each message
Task.Run(async () =>
{
var currentNumAttempts = 0;
var committed = false;
var response = new Response();
while (currentNumAttempts < _maxNumAttempts)
{
currentNumAttempts++;
// SendDataAsync is a method that sends http request to some end-points
response = await Helper.SendDataAsync(consumeResult.Value, _config, _logger);
if (response != null && response.Code >= 0)
{
try
{
consumer.Commit(consumeResult);
committed = true;
break;
}
catch (KafkaException ex)
{
// log
}
}
else
{
// log
}
if (currentNumAttempts < _maxNumAttempts)
{
// Delay between tries
await Task.Delay(TimeSpan.FromSeconds(_retryIntervalInSec));
}
}
if (!committed)
{
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException ex)
{
// log
}
}
}, stoppingToken);
}
catch (ConsumeException ex)
{
// log
}
}
}
catch (OperationCanceledException ex)
{
// log
consumer.Close();
}
}
}
}