Я пытаюсь реализовать мессангинг через RabbitMQ.Я всегда слышал, что это безумно быстро, поэтому я попробовал.Но по какой-то причине, это очень медленно.
У меня есть одна очередь с именем blockchain
.Он имеет несколько сообщений, размер сообщения составляет около 1 КБ.Затем я создаю прослушиватель очереди, который в основном просто пишет постоянный текст в стандартный вывод:
public async Task RunAsync(CancellationToken cancellationToken)
{
// using EasyNetQ in this example.
using (var bus = RabbitHutch.CreateBus(_queueSettings.RabbitConnection)).Advanced)
{
using (var _ = await ConsumeBus(bus, _queueSettings.QueueName))
{
Console.WriteLine("Listening for messages. Hit <return> to quit.");
cancellationToken.WaitHandle.WaitOne();
}
}
}
private async Task<IDisposable> ConsumeBus(IAdvancedBus bus, string queueName)
{
var queue = await bus.QueueDeclareAsync(queueName, true).ConfigureAwait(false);
return bus.Consume(queue,
(body, properties, info) => Console.WriteLine("Got a message!"));
}
Но когда я смотрю в очередь, я вижу, что скорость потребления сообщений составляет всего ~ 40 мсг /сек .По сравнению с 50k msg / sec , написанными в статьях по сети, это кажется очень медленным.Когда я перехожу на страницу управления, я вижу, что очередь почти полностью используется:
Строка подключения, кажется, тоже в порядке, мы берем большие партии иобработайте их:
"QueueSettings": {
"RabbitConnection":
"host=myhost:5672;username=reader;password=readerpass5;requestedHeartbeat=20;prefetchcount=100;timeout=1000;publisherConfirms=true",
"QueueName": "blockchain"
},
Что здесь не так?Как я могу получить это удивительное количество тысяч сообщений в секунду за очередь?Должен ли я развернуть 1000 потребителей и ожидать, что у них будет 40 * 1000 мсг / с?
Эта версия немного быстрее, однако я все еще не могу получить более 50 мсг / с
public async Task RunAsync(CancellationToken cancellationToken)
{
var factory = GetConnectionFactory()
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
using (var _ = ConsumeBus(channel, _queueSettings.QueueName))
{
_contextlessLogger.Information("Listening for messages. Hit <return> to quit.");
cancellationToken.WaitHandle.WaitOne();
}
}
}
private IDisposable ConsumeBus(IModel bus, string queueName)
{
return new Consumer(bus, queueName, async bytes => Console.WriteLine("Got a message!"));
}
public class Consumer : IDisposable
{
private readonly IModel _channel;
private readonly Func<byte[], Task> _action;
private readonly EventingBasicConsumer _consumer;
public Consumer(IModel channel, string queueName, Func<byte[], Task> action)
{
_channel = channel;
_action = action;
_consumer = new EventingBasicConsumer(channel);
_consumer.Received += OnReceived;
channel.BasicConsume(queueName, false, _consumer);
}
private async void OnReceived(object model, BasicDeliverEventArgs ea)
{
try
{
await _action(ea.Body);
_channel.BasicAck(ea.DeliveryTag, false);
}
catch
{
_channel.BasicNack(ea.DeliveryTag, false, true);
}
}
public void Dispose()
{
_consumer.Received -= OnReceived;
}
}