Как использовать счетчик предварительной выборки для потребителя с клиентом .NET RabbitMQ - PullRequest
0 голосов
/ 08 февраля 2019

В документе RabbitMQ говорится следующее:

"Как правило, следует избегать совместного использования экземпляров каналов между потоками. Приложения должны предпочитать использовать канал для каждого потока вместо совместного использованияодин и тот же канал в нескольких потоках. "

В настоящее время мы смотрим на количество предварительных выборок, где рекомендуется, чтобы при небольшом количестве потребителей и autoack = false, то мы использовали много сообщений водин раз.Однако мы обнаруживаем, что предварительная выборка не действует, если потребитель отправляет подтверждения вручную, используя один поток выполнения.Однако, если мы обернем обработку потребителя в Задаче, мы обнаружим, что число предварительных выборок имеет значение и существенно улучшает производительность потребителя.

См. Следующий пример, где мы упаковываем потребление сообщения потребителем в Задачеobject:

class Program
{
    public static void Main()
    {
        var factory = new ConnectionFactory()
        {
            HostName = "172.20.20.13",
            UserName = "billy",
            Password = "guest",
            Port = 5671,
            VirtualHost = "/",
            Ssl = new SslOption
            {
                Enabled = true,
                ServerName = "rabbit.blah.com",
                Version = System.Security.Authentication.SslProtocols.Tls12
            }
        };
        var connection = factory.CreateConnection();
        var channel = connection.CreateModel();
        channel.BasicQos(0, 100, false);
        channel.ExchangeDeclare(exchange: "logs", type: "fanout");
        var queueName = channel.QueueDeclare().QueueName;
        Console.WriteLine(" [*] Waiting for logs.");

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var _result = new Task(() => {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                System.Threading.Thread.Sleep(80);

                channel.BasicAck(ea.DeliveryTag, false);
            });
            _result.Start();
        };
        channel.BasicConsume(queue: "test.queue.1", autoAck: false, consumer: consumer);

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

У меня вопрос: как люди реализуют потребителей с помощью клиента .NET rabbitmq, который использует счетчик предварительных выборок? Нужно ли вручную подтверждать, используя какое-то задание?сейф

Ответы [ 2 ]

0 голосов
/ 11 февраля 2019

Документация, на которую вы ссылаетесь, относится к клиенту Java.Вместо этого вы должны ссылаться на этот документ .

Вы используете последнюю версию клиента .NET (5.1), поэтому выполняете свою работу в обработчике событий Receivedне будет блокировать другие потоки, которые имеют дело с данными TCP, и не будет блокировать тактовые импульсы - оба из них хороши.

Прежде всего, вызов channel.BasicQos(0, 1, false) означает, что ваш потребитель получит только одно готовое сообщение ввремя от RabbitMQ и до вызова BasicAck другое сообщение не будет доставлено.Таким образом, на самом деле нет причин выполнять свою работу в другом потоке, поскольку вы все равно не получите другое сообщение.

Если вы увеличите значение предварительной выборки (путем экспериментов и выполнения тестов), вам придется выполнитьваша работа в фоновом потоке, если ваша работа выполняется более нескольких миллисекунд.

Когда вы выполняете свою работу в обратном вызове события Received, он блокирует поток, который используется для выполнения этого обратного вызова.поскольку обратный вызов не выполняется в своем собственном потоке.Таким образом, вы можете убедиться, что ваша работа очень короткая, или выполнить работу в другом потоке.

Я просто потратил некоторое время на просмотр клиентского кода .NET, и я почти уверен, что экземпляр IModel неПоток безопасно.Если вы увеличите предварительную выборку, у вас будет возможность подтверждать несколько сообщений одновременно, поэтому я рекомендую реализовать решение, которое использует это, а также гарантирует, что BasicAck вызывается в том же потоке, в котором создается соединение.


ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает список рассылки rabbitmq-users и только иногда отвечает на вопросы по StackOverflow.

0 голосов
/ 08 февраля 2019

Источник: https://www.rabbitmq.com/api-guide.html

Когда используются ручные подтверждения, важно учитывать, какой поток выполняет подтверждение.Если он отличается от потока, который получил доставку (например, Consumer # handleDelivery делегировал обработку доставки другому потоку), подтверждение с множественным параметром, установленным в значение true, небезопасно и приведет к двойным подтверждениям, и, следовательно, исключению протокола уровня каналаэто закрывает канал.Подтверждение одного сообщения за один раз может быть безопасным.

channel.basicAck(tag, false) является поточно-ориентированным

, но consumerChannel.basicAck(tag, true) не является.

Также упоминаются некоторые хорошие моментыв RabbitMQ и каналы безопасности потока Java

...