Проблема с RabbitMQ и плагином дедупликации - PullRequest
0 голосов
/ 18 мая 2019

Я новичок в RabbitMQ;для нового проекта мне нужно использовать плагин дедупликации.Я использую рабочий процесс AspNet Core 3.0, а язык - C #.

Я попробовал очень простой пример: 2 издателя отправляют 10 сообщений с номерами от 1 до 10, а один потребитель получает сообщения и подтверждает их.

У меня довольно странные и непредсказуемые результаты:

, если я запускаю 3 рабочих (2 издателя и одного потребителя) в одном и том же процессе, похоже, что плагин дедупликации работает нормально и вставляется в очередьтолько 10 уникальных сообщений, но потребитель читает только первые 2 и подтверждает только одно из них.

если я запускаю издателей и потребителей в двух разных процессах, потребитель получает все 10 сообщений, но после подтверждения сообщения остаютсяв очереди, и если я снова запускаю потребительский процесс, они снова обрабатываются.

Я пытался найти в Google какой-то полный рабочий образец для дедупликации, но безуспешно

Publisher

        int cnt = 1;
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            Dictionary<string, object> dd = new Dictionary<string, object>();
            dd["x-message-deduplication"] = true;
            channel.QueueDeclare(queue: qname,
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: dd);

            while (!stoppingToken.IsCancellationRequested)
            {
                var message = GetMessage(cnt);
                var body = Encoding.UTF8.GetBytes(message);
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;
                Dictionary<string, object> d = new Dictionary<string, object>();
                d["x-deduplication-header"] = cnt;
                properties.Headers = d;

                channel.BasicPublish(exchange: "",
                                     routingKey: qname,
                                     basicProperties: properties,
                                     body: body);
                Console.WriteLine(" [x] Sent {0}", message);

                logDB(cnt, "Sender"+Wname);
                cnt++;
                if (cnt > 10)
                    break;
                await Task.Delay(1000, stoppingToken);
            }

Потребитель:

        while (!stoppingToken.IsCancellationRequested)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                Dictionary<string, object> dd = new Dictionary<string, object>();
                dd["x-message-deduplication"] = true;
                channel.QueueDeclare(queue: qname,
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: dd);

                _logger.LogInformation("{0} Waiting for messages.", Cname);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    _logger.LogInformation("{0} Received {1}", Cname, message);


                    string[] parts = message.Split('-');
                    int cntmsg = int.Parse(parts[1]);

                    logDB(cntmsg, Cname);

                    Thread.Sleep((cntmsg % 5) * 1000);


                    _logger.LogInformation("{0} Received {1} done", Cname, message);

                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);
                };
                channel.BasicConsume(queue: qname,
                                     autoAck: false,
                                     consumer: consumer);

                _logger.LogInformation("{0} After BasicConsume", Cname);

                while (true)
                    await Task.Delay(1000, stoppingToken);

            }

1 Ответ

0 голосов
/ 13 июня 2019

после обращения к разработчику плагина дедупликации выяснилось, что проблема связана с типом (int) заголовка дедупликации, работает строковое значение.

Он скоро выпустит новую версию, поддерживающую тип данных int.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...