Каков правильный метод получения сообщения, его обработки и последующей публикации? Я сталкиваюсь с множеством неподтвержденных сообщений и считаю, что происходит некоторая блокировка. Пытаюсь понять, как лучше всего поступать так.
Я работаю над набором служб, которые будут обрабатывать около 50 тыс. Запросов в день. Я решил использовать RabbitMQ и три Windows службы, написанные на Do tnet Core 3.1.
Я нарисовал схему процесса, но в основном это работает так:
- внешний служба публикует сообщение в Очереди # 1
- . Служба A "прослушивает" Очередь # 1 и принимает все сообщения, поступающие в Очередь. Выполняется вызов базы данных, а затем служба A передает сообщение в очередь №2
- служба B «прослушивает» очередь №2 и принимает все сообщения, поступающие в очередь. Выполняется некоторая внутренняя обработка, а затем служба B передает сообщение в очередь №3
- служба C «слушает» очередь №2 и принимает все сообщения, поступающие в очередь. После выполнения некоторой внутренней обработки служба C отправляет сообщение в базу данных
Пример кода ниже: изображение
protected override void OnStart(string[] args)
{
logger.LogInformation("Starting Service ...");
base.OnStart(args);
string queue = "Queue_StageOne";
this.connection = factory.CreateConnection();
this.channel = connection.CreateModel();
this.publishingChannel = connection.CreateModel();
this.channel.BasicQos(0, 1, false);
consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Recieved;
this.channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
}
private async Task Consumer_Recieved(object sender, BasicDeliverEventArgs @event)
{
var body = @event.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
var inboundTransferObject = PatientObject.ConvertFromJson(message);
//logger.LogInformation("Processed message " + inboundTransferObject.WebhookMessageId);
//ServicePointManager.SecurityProtocol = SecurityProtocolType.SystemDefault;
//X509Certificate2 cert = new X509Certificate2(config["CertificationPath"].ToString(), config["PFXPassword"]);
//JToken access_token = GetAccessToken(cert);
//JObject payerData = GetPractitionerData(inboundTransferObject, cert, access_token);
//inboundTransferObject = ProcessPractitioner(inboundTransferObject, payerData);
var outboundTransferObject = Encoding.ASCII.GetBytes(inboundTransferObject.ConvertToJson());
channel.BasicAck(deliveryTag: @event.DeliveryTag, multiple: false);
publishingChannel.BasicPublish(exchange: "ExchangeA", routingKey: "Queue_StageTwo", basicProperties: null, body:outboundTransferObject);
await Task.Delay(250);
}