Я новичок в RabbitMQ;для нового проекта мне нужно использовать плагин дедупликации.Я использую рабочий процесс AspNet Core 3.0, а язык - C #.
Я попробовал очень простой пример: 2 издателя отправляют 10 сообщений с номерами от 1 до 10, а один потребитель получает сообщения и подтверждает их.
У меня довольно странные и непредсказуемые результаты:
, если я запускаю 3 рабочих (2 издателя и одного потребителя) в одном и том же процессе, похоже, что плагин дедупликации работает нормально и вставляется в очередьтолько 10 уникальных сообщений, но потребитель читает только первые 2 и подтверждает только одно из них.
если я запускаю издателей и потребителей в двух разных процессах, потребитель получает все 10 сообщений, но после подтверждения сообщения остаютсяв очереди, и если я снова запускаю потребительский процесс, они снова обрабатываются.
Я пытался найти в Google какой-то полный рабочий образец для дедупликации, но безуспешно
Publisher
int cnt = 1;
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
Dictionary<string, object> dd = new Dictionary<string, object>();
dd["x-message-deduplication"] = true;
channel.QueueDeclare(queue: qname,
durable: true,
exclusive: false,
autoDelete: false,
arguments: dd);
while (!stoppingToken.IsCancellationRequested)
{
var message = GetMessage(cnt);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
Dictionary<string, object> d = new Dictionary<string, object>();
d["x-deduplication-header"] = cnt;
properties.Headers = d;
channel.BasicPublish(exchange: "",
routingKey: qname,
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
logDB(cnt, "Sender"+Wname);
cnt++;
if (cnt > 10)
break;
await Task.Delay(1000, stoppingToken);
}
Потребитель:
while (!stoppingToken.IsCancellationRequested)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
Dictionary<string, object> dd = new Dictionary<string, object>();
dd["x-message-deduplication"] = true;
channel.QueueDeclare(queue: qname,
durable: true,
exclusive: false,
autoDelete: false,
arguments: dd);
_logger.LogInformation("{0} Waiting for messages.", Cname);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
_logger.LogInformation("{0} Received {1}", Cname, message);
string[] parts = message.Split('-');
int cntmsg = int.Parse(parts[1]);
logDB(cntmsg, Cname);
Thread.Sleep((cntmsg % 5) * 1000);
_logger.LogInformation("{0} Received {1} done", Cname, message);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);
};
channel.BasicConsume(queue: qname,
autoAck: false,
consumer: consumer);
_logger.LogInformation("{0} After BasicConsume", Cname);
while (true)
await Task.Delay(1000, stoppingToken);
}