RabbitMq перестал получать сообщения .Net Core - PullRequest
0 голосов
/ 16 мая 2018

Я разрабатываю несколько служб на .Net Core и использую RabbitMq для отправки сообщений между службами. Для получения сообщений я создаю сервисы, унаследованные от IHostedService, которые подключаются к RabbitMq и потребляют. Ниже вы можете найти мой базовый получатель сообщений

public abstract class MessageReceiver<TMessage> : IMessageReceiver<TMessage>
{
    private readonly IServiceScopeFactory _scopeFactory;        

    private Task _executingTask;

    private CancellationTokenSource _cts;

    public string QueueName { get; private set; }

    private readonly IMessageHandler<TMessage> _handler;

    public MessageReceiver(IServiceScopeFactory scopeFactory, IMessageHandler<TMessage> handler, string queueName)
    {
        _scopeFactory = scopeFactory;
        QueueName = queueName;
        _handler = handler;
    }

    public virtual async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        using (var scope = _scopeFactory.CreateScope())
        {
            var factory = scope.ServiceProvider.GetRequiredService<IConnectionFactory>();
            var connection = factory.CreateConnection();
            var connectionModel = connection.CreateModel();
            connectionModel.QueueDeclare(QueueName, true, false, false, null);
            connectionModel.BasicQos(0, 100, false);
            var basicProperties = connectionModel.CreateBasicProperties();
            basicProperties.Persistent = true;

            while (!cancellationToken.IsCancellationRequested)
            {
                var consumer = new EventingBasicConsumer(connectionModel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    _handler.Handle(body);
                };

                connectionModel.BasicConsume(QueueName, autoAck: true, consumer: consumer);
                await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
            }
        }
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        _executingTask = ExecuteAsync(_cts.Token);
        return _executingTask.IsCompleted ? _executingTask : Task.CompletedTask;
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        if (_executingTask == null)
        {
            return;
        }

        _cts.Cancel();
        await Task.WhenAny(_executingTask, Task.Delay(-1, cancellationToken));
        cancellationToken.ThrowIfCancellationRequested();
    }

Как вы можете видеть, это обобщенно, поэтому я также строю IMessageHandler, в котором фактически реализована наша бизнес-логика - обработка полученного сообщения

А вот и один из получателей сообщений

public class DemoReceiver : MessageReceiver<DemoMessage>
{        
    public DemoReceiver (IServiceScopeFactory factory, IMessageHandler<DemoMessage> handler) : base(factory, handler, QueueNames.Demo)
    {
    }
}

Еще одна важная вещь - приложения работают на Docker . Так что проблема в том, что я просто заметил, что иногда он останавливает потребление сообщений из очереди. Итак, я вижу ожидающие сообщения в панели управления очередью RabbitMq. Я не знаю, что может вызвать проблему. Может быть, IHostedService просто «спит» и не запускается. У вас есть идеи?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...