Потребитель RabbitMQ медленно работает, когда остаются неподтвержденные сообщения - PullRequest
0 голосов
/ 06 сентября 2018

У меня есть консольное приложение .NET Core, которое читает сообщение из RabbitMQ и сохраняет данные в базе данных. Он использует RabbitMQ.Client Assembly 5.1.0 и устанавливает EventingConsumer следующим образом:

var factory = new ConnectionFactory
{
    HostName = _hostName,
    UserName = _userName,
    Password = _password,
    RequestedHeartbeat = 20,
    AutomaticRecoveryEnabled = true,
    NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
};

_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.BasicQos(0, prefetchCount, false);

var consumer = new EventingBasicConsumer(_channel);
consumer.Received += HandleMessage;
_consumerTag = _channel.BasicConsume(_queueName, false, consumer);

Если я вызову _channel.BasicAck для сообщения внутри моего метода HandleMessage, то есть, как только получено каждое сообщение, скорость доставки сообщений составляет ~ 1500 в секунду. Однако я хочу дождаться подтверждения сообщения, пока оно не будет сохранено в БД. Если я это сделаю, скорость упадет до 300-500 в секунду.

Сохранение в БД осуществляется в отдельном потоке и не является узким местом. HandleMessage только сохраняет сообщение в памяти для последующего сохранения в другом потоке. Я попытался поэкспериментировать с различными значениями prefetchCount от 100 до 100 000, и это не имеет значения. Если я профилирую приложение, я вижу, что поток сеанса AMQP ("WorkPool-Session # 1: Connection (...)" тратит большую часть своего времени на ожидание WaitHandle в RabbitMQ.Client.ConsumerWorkService+WorkPool.Loop()

Что я делаю не так? Как я могу быстрее получать сообщения, не получая их сразу? (Сервер RabbitMQ 3.7.7)

1 Ответ

0 голосов
/ 09 сентября 2018

prefetchCount ограничивает количество неподтвержденных сообщений, которые будут доставлены вашему потребителю. Вы можете увеличить это значение, чтобы получать больше сообщений из очереди без их подтверждения.

Однако, поскольку кажется, что постоянство базы данных является узким местом, я ожидаю, что скорость доставки останется неизменной: Допустим, вам потребуется 2 мсек, чтобы завершить вставку базы данных на сообщение, что составляет 500 вставок / сек После того, как вы превысили количество ожидающих сообщений (количество предварительных выборок), вы получаете ACK со скоростью 500 сообщений в секунду, поэтому вы будете получать новые сообщения с такой скоростью. Размер буфера не имеет большого значения для узкого места.

Чтобы повысить пропускную способность системы, вы можете иметь дополнительных потребителей или тем или иным образом улучшить пропускную способность для базы данных (например, массовые вставки, улучшения схемы, сегментирование) и т. Д., Но у RabbitMQ нет средств для удержания бесконечное количество неподтвержденных (в полете) сообщений.

...