Правильный способ потреблять и немедленно публиковать sh RabbitMQ сообщений из очереди в очередь? - PullRequest
1 голос
/ 28 мая 2020

Каков правильный метод получения сообщения, его обработки и последующей публикации? Я сталкиваюсь с множеством неподтвержденных сообщений и считаю, что происходит некоторая блокировка. Пытаюсь понять, как лучше всего поступать так.

Я работаю над набором служб, которые будут обрабатывать около 50 тыс. Запросов в день. Я решил использовать RabbitMQ и три Windows службы, написанные на Do tnet Core 3.1.

Я нарисовал схему процесса, но в основном это работает так:

  • внешний служба публикует сообщение в Очереди # 1
  • . Служба A "прослушивает" Очередь # 1 и принимает все сообщения, поступающие в Очередь. Выполняется вызов базы данных, а затем служба A передает сообщение в очередь №2
  • служба B «прослушивает» очередь №2 и принимает все сообщения, поступающие в очередь. Выполняется некоторая внутренняя обработка, а затем служба B передает сообщение в очередь №3
  • служба C «слушает» очередь №2 и принимает все сообщения, поступающие в очередь. После выполнения некоторой внутренней обработки служба C отправляет сообщение в базу данных

Пример кода ниже: изображение

enter image description here

protected override void OnStart(string[] args)
    {

        logger.LogInformation("Starting Service ...");

        base.OnStart(args);
        string queue = "Queue_StageOne";

        this.connection = factory.CreateConnection();
        this.channel = connection.CreateModel();
        this.publishingChannel = connection.CreateModel();
        this.channel.BasicQos(0, 1, false);

        consumer = new AsyncEventingBasicConsumer(channel);
        consumer.Received += Consumer_Recieved;

        this.channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
    }

    private async Task Consumer_Recieved(object sender, BasicDeliverEventArgs @event)
    {
        var body = @event.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        var inboundTransferObject = PatientObject.ConvertFromJson(message);

        //logger.LogInformation("Processed message " + inboundTransferObject.WebhookMessageId);

        //ServicePointManager.SecurityProtocol = SecurityProtocolType.SystemDefault;
        //X509Certificate2 cert = new X509Certificate2(config["CertificationPath"].ToString(), config["PFXPassword"]);

        //JToken access_token = GetAccessToken(cert);

        //JObject payerData = GetPractitionerData(inboundTransferObject, cert, access_token);

        //inboundTransferObject = ProcessPractitioner(inboundTransferObject, payerData);

        var outboundTransferObject = Encoding.ASCII.GetBytes(inboundTransferObject.ConvertToJson());

        channel.BasicAck(deliveryTag: @event.DeliveryTag, multiple: false);
        publishingChannel.BasicPublish(exchange: "ExchangeA", routingKey: "Queue_StageTwo", basicProperties: null, body:outboundTransferObject);
        await Task.Delay(250);

    }

1 Ответ

0 голосов
/ 29 мая 2020

Не совсем ясно, о чем вы здесь спрашиваете, но выделяется одна вещь: ваши службы не должны подтверждать входящее сообщение до тех пор, пока они не завершат все этапы обработки, включая публикацию последующих исходящих сообщений. Сообщения. В вашем примере кода вы, кажется, подтверждаете входящее сообщение до публикации исходящего сообщения.

Это, однако, не объясняет описанный вами симптом «Я столкнулся с множеством неподтвержденных сообщений». Когда вы сталкиваетесь с этим? Сколько это много? Вы установили лимит предварительной выборки для своего канала? В целях тестирования вы можете попробовать установить счетчик упреждающей выборки равным единице, чтобы гарантировать, что одновременно будет передаваться только одно сообщение.

channel.BasicQos(1, global: true)

См. в этом разделе документации RabbitMQ:

«Поскольку сообщения отправляются (проталкиваются) клиентам асинхронно, обычно на канале в любой данный момент находится более одного сообщения« в полете ». Кроме того, ручные подтверждения от клиентов также по своей природе асинхронны. Поэтому существует скользящее окно с неподтвержденными тегами доставки. Разработчики часто предпочитают ограничивать размер этого окна, чтобы избежать проблемы с неограниченным буфером на стороне потребителя. выполняется путем установки значения «prefetch count» с помощью метода basi c .qos. Это значение определяет максимальное количество неподтвержденных доставок, разрешенных на канале. Как только количество достигнет настроенного количества, RabbitMQ перестанет доставлять мне больше сообщений на канале, если хотя бы один из постоянные признаются. "

...