Я разрабатываю несколько служб на .Net Core и использую RabbitMq для отправки сообщений между службами. Для получения сообщений я создаю сервисы, унаследованные от IHostedService
, которые подключаются к RabbitMq и потребляют. Ниже вы можете найти мой базовый получатель сообщений
public abstract class MessageReceiver<TMessage> : IMessageReceiver<TMessage>
{
private readonly IServiceScopeFactory _scopeFactory;
private Task _executingTask;
private CancellationTokenSource _cts;
public string QueueName { get; private set; }
private readonly IMessageHandler<TMessage> _handler;
public MessageReceiver(IServiceScopeFactory scopeFactory, IMessageHandler<TMessage> handler, string queueName)
{
_scopeFactory = scopeFactory;
QueueName = queueName;
_handler = handler;
}
public virtual async Task ExecuteAsync(CancellationToken cancellationToken)
{
using (var scope = _scopeFactory.CreateScope())
{
var factory = scope.ServiceProvider.GetRequiredService<IConnectionFactory>();
var connection = factory.CreateConnection();
var connectionModel = connection.CreateModel();
connectionModel.QueueDeclare(QueueName, true, false, false, null);
connectionModel.BasicQos(0, 100, false);
var basicProperties = connectionModel.CreateBasicProperties();
basicProperties.Persistent = true;
while (!cancellationToken.IsCancellationRequested)
{
var consumer = new EventingBasicConsumer(connectionModel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
_handler.Handle(body);
};
connectionModel.BasicConsume(QueueName, autoAck: true, consumer: consumer);
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
}
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_executingTask = ExecuteAsync(_cts.Token);
return _executingTask.IsCompleted ? _executingTask : Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_executingTask == null)
{
return;
}
_cts.Cancel();
await Task.WhenAny(_executingTask, Task.Delay(-1, cancellationToken));
cancellationToken.ThrowIfCancellationRequested();
}
Как вы можете видеть, это обобщенно, поэтому я также строю IMessageHandler
, в котором фактически реализована наша бизнес-логика - обработка полученного сообщения
А вот и один из получателей сообщений
public class DemoReceiver : MessageReceiver<DemoMessage>
{
public DemoReceiver (IServiceScopeFactory factory, IMessageHandler<DemoMessage> handler) : base(factory, handler, QueueNames.Demo)
{
}
}
Еще одна важная вещь - приложения работают на Docker . Так что проблема в том, что я просто заметил, что иногда он останавливает потребление сообщений из очереди. Итак, я вижу ожидающие сообщения в панели управления очередью RabbitMq. Я не знаю, что может вызвать проблему. Может быть, IHostedService
просто «спит» и не запускается. У вас есть идеи?