C # Потребление / Чтение сообщения каждый интервал x с размером пакета y От Rabbit MQ - PullRequest
0 голосов
/ 11 июля 2019

Я использовал Rabbit MQ для обработки сообщений в последовательном порядке.Теперь у меня есть сценарий, где не нужно сразу принимать сообщения, но я хотел бы получать / читать сообщения из MQ каждые 1 (х) минут с размером пакета 20 (у)

, поэтому я могу обработатьэто 20 сообщений за один раз и сохранить в БД за один вызов вместо 20 раз для каждого сообщения.

Итак, как получать / получать сообщения в пакетном режиме на каждом интервале x.

У меня естьвидел следующую информацию, Использование сообщений в пакетном режиме - RabbitMQ и Rabbitmq получают несколько сообщений с помощью одного синхронного вызова с использованием .NET

Я пытался реализовать реализацию 2-го вопроса (Вопросы /32309155) но не работает, не понял, "потребитель. Получено" получит ** _ fetchSize ==> 20 ** значит, он получит 20 сообщений за одно чтение?или как это будет работать, потому что я изменил размер fetchsize на 10, но consumer.received получает одно сообщение.

using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.BasicQos(0, 1, false);
                channel.ExchangeDeclare("helloExchange", type:"direct");
                channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false,
                    arguments: null);

                channel.QueueBind("hello", "helloExchange", routingKey:"hello");

                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += (model, ea) =>
                {
                    bool canAck = false;
                    var retryCount = 0;



                    try
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                       // DO PROCESS MESSAGE HERE
                        Console.WriteLine($"{typeof(MyConsumer).Name} Message consumed {message}");
                        canAck = true;
                    }
                    catch (Exception ex)
                    {canAck = false;
                      // LOG ERROR
                    }

                    try
                    {
                        if (canAck)
                        {
                            channel.BasicAck(ea.DeliveryTag, false);
                        }
                        else
                        {
                            channel.BasicNack(ea.DeliveryTag, false, false);
                        }
                    }
                    catch (AlreadyClosedException ex)
                    {
                        Console.WriteLine(ex.Message + " >> RabbitMQ is closed!");
                    }
                };
                channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }

1 Ответ

0 голосов
/ 12 июля 2019

Потребитель будет всегда получать только одно сообщение за раз. Размер выборки будет определять, сколько неподтвержденных сообщений может быть доставлено потребителю по каналу.

Итак, рассмотрим размер выборки 3, например, 3 сообщения будут доставлены, но если вы не подтвердите их, это означает, что даже если вы получите новое сообщение, поступающее в очередь (4-е), оно не будет отправлено канал (таким образом, не используется), пока вы не подтвердите хотя бы одно из 3 исходных сообщений.

В вашем случае, чтобы иметь возможность обрабатывать ваш сценарий и принимать сообщения в пакетном режиме, вы можете сделать что-то вроде:

  • установить размер выборки 20
  • каждый раз, когда вы получаете новое сохраненное сообщение, добавляете его в список
  • как только вы достигнете предела (20), вы начнете его обрабатывать и подтвердите все сообщения в пакете (таким образом очистите список)
  • когда пакет завершен и все сообщения подтверждены, вы начнете получать новые сообщения (затем снова начнете поток)

Даже если это технически возможно, я не буду использовать эту реализацию, поскольку она усложнит обработку повторных попыток / обработку ошибок и т. Д. Используя очередь сообщений, вы можете сделать свой код атомарным, идемпотентным и устойчивым, что будет облегчить обработку ошибок и повторных попыток.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...