RabbitMQ и C # - PullRequest
       5

RabbitMQ и C #

4 голосов
/ 18 ноября 2011

В RabbitMQ есть способ использовать его, аналогичный MSSMQ, где можно извлечь 1000 сообщений из очереди, затем выполнить вставку в базу данных и продолжить с нее.

Кажется, я не могу сделать это с подпиской на канал, а затем с помощью foreach над BasicDeliveryEventArgs в подписке, с помощью этого оператора If с максимальным числом сообщений, которое я хочу обработать в данный момент времени.

Спасибо заранее Это, однако, по-прежнему принимает все 22k сообщений из очереди

using (IConnection connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        channel.QueueDeclare("****", true, false, false, null);

        var subscription = new Subscription(channel, "****", false);
        int maxMessages = 5;
        int i = 0;
        foreach (BasicDeliverEventArgs eventArgs in subscription)
        {
            if (++i == maxMessages)
            {
                Console.WriteLine("Took 5 messages");
                subscription.Ack(eventArgs);
                break;
            }
        }
    }
}

1 Ответ

11 голосов
/ 18 ноября 2011

Я предполагаю, что вы хотите оптимизировать загрузку сообщений в базу данных, группируя их группы в более крупные транзакции, а не оплачивая транзакцию за сообщение.С обязательным предупреждением о том, что это означает, что большие группы сообщений могут потерпеть неудачу вместе, даже если только одно из них вызывает проблему, вот как вы должны это решить ...

Установить QOS на канале:

channel.BasicQos(0, 1000, false);

Это позволит предварительно извлечь 1000 сообщений и заблокировать дальнейший трафик, пока вы не подтвердите что-либо.Обратите внимание, что он не выбирается в блоках по 1000. Скорее, он гарантирует, что максимум 1000 сообщений UNACK предварительно выбираются за один раз.Имитировать блочные передачи так же просто, как сначала обработать 1000 сообщений, а затем ACK'ить их все за один раз.

См. здесь и здесь для более авторитетногообъяснение, чем мое.

Еще один момент: вы можете очистить очередь, как только сообщения станут доступны, даже если вы не сделали квоту в 1000 сообщений.Вы должны быть в состоянии сделать это, вызывая queue.BasicGet() внутри цикла foreach, пока он не иссякнет, и затем доставив все, что у вас есть (включая сообщение, которое вы извлекли из subscription), в базу данных.Предостережение: я сам не пробовал, так что я мог бы говорить ерунду, но я думаю, что это сработает.Прелесть этого метода в том, что он сразу помещает сообщения в базу данных, не дожидаясь полной партии из 1000 сообщений.Если база данных отстает от обработки слишком большого количества небольших транзакций, резерв предварительной выборки будет просто заполняться большим между каждым циклом.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...