Почему кролик удивительно медленный? - PullRequest
0 голосов
/ 05 июня 2018

Я пытаюсь реализовать мессангинг через RabbitMQ.Я всегда слышал, что это безумно быстро, поэтому я попробовал.Но по какой-то причине, это очень медленно.

У меня есть одна очередь с именем blockchain.Он имеет несколько сообщений, размер сообщения составляет около 1 КБ.Затем я создаю прослушиватель очереди, который в основном просто пишет постоянный текст в стандартный вывод:

public async Task RunAsync(CancellationToken cancellationToken)
{ 
    // using EasyNetQ in this example.
    using (var bus = RabbitHutch.CreateBus(_queueSettings.RabbitConnection)).Advanced)
    {
        using (var _ = await ConsumeBus(bus, _queueSettings.QueueName))
        {
            Console.WriteLine("Listening for messages. Hit <return> to quit.");

            cancellationToken.WaitHandle.WaitOne();
        }
    }
}

private async Task<IDisposable> ConsumeBus(IAdvancedBus bus, string queueName)
{
    var queue = await bus.QueueDeclareAsync(queueName, true).ConfigureAwait(false);
    return bus.Consume(queue,
                       (body, properties, info) => Console.WriteLine("Got a message!"));
}

Но когда я смотрю в очередь, я вижу, что скорость потребления сообщений составляет всего ~ 40 мсг /сек .По сравнению с 50k msg / sec , написанными в статьях по сети, это кажется очень медленным.Когда я перехожу на страницу управления, я вижу, что очередь почти полностью используется:

enter image description here

Строка подключения, кажется, тоже в порядке, мы берем большие партии иобработайте их:

"QueueSettings": {
    "RabbitConnection":
        "host=myhost:5672;username=reader;password=readerpass5;requestedHeartbeat=20;prefetchcount=100;timeout=1000;publisherConfirms=true",
    "QueueName": "blockchain"
},

Что здесь не так?Как я могу получить это удивительное количество тысяч сообщений в секунду за очередь?Должен ли я развернуть 1000 потребителей и ожидать, что у них будет 40 * 1000 мсг / с?


Эта версия немного быстрее, однако я все еще не могу получить более 50 мсг / с

public async Task RunAsync(CancellationToken cancellationToken)
{
    var factory = GetConnectionFactory()
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        using (var _ = ConsumeBus(channel, _queueSettings.QueueName))
        {
            _contextlessLogger.Information("Listening for messages. Hit <return> to quit.");

            cancellationToken.WaitHandle.WaitOne();
        }
    }
}

private IDisposable ConsumeBus(IModel bus, string queueName)
{
    return new Consumer(bus, queueName, async bytes => Console.WriteLine("Got a message!"));
}

public class Consumer : IDisposable
{
    private readonly IModel _channel;
    private readonly Func<byte[], Task> _action;
    private readonly EventingBasicConsumer _consumer;

    public Consumer(IModel channel, string queueName, Func<byte[], Task> action)
    {
        _channel = channel;
        _action = action;
        _consumer = new EventingBasicConsumer(channel);
        _consumer.Received += OnReceived;
        channel.BasicConsume(queueName, false, _consumer);
    }

    private async void OnReceived(object model, BasicDeliverEventArgs ea)
    {
        try
        {
            await _action(ea.Body);
            _channel.BasicAck(ea.DeliveryTag, false);
        }
        catch
        {
            _channel.BasicNack(ea.DeliveryTag, false, true);
        }
    }


    public void Dispose()
    {
        _consumer.Received -= OnReceived;
    }
}
...