RabbitMQ не получает сообщения после сброса OnMessageReceived - PullRequest
0 голосов
/ 27 февраля 2019

Я использую RabbitMQ.Client (C #) для работы с RabbitMQ.У меня проблемы с извлечением сообщений из очереди после удаления и повторного добавления обработчика событий полученных сообщений.consumer.Received -= OnMessageRecieved; У меня сложная система, где служба Windows подписывается на очереди RabbitMQ и обрабатывает сообщения.Существует несколько потоков, выполняемых для разных целей: таймер для вызова PUSH API, другой таймер для выполнения API-аутентификации и т. Д. Если аутентификация API-интерфейса не проходит, мы не хотим обрабатывать сообщения из очереди.Мы хотим держать сообщения в состоянии готовности.Только, когда аутентификация API успешна, мы хотим обрабатывать сообщения.Таким образом, в случае сбоя мы удаляем обработчик события и в случае успеха добавляем его обратно.Когда мы это делаем, обработчик событий успешно добавляется, но теперь сообщения не извлекаются из очереди.

Чтобы смоделировать это, я создал консольное приложение.Я написал это менее чем за час, я знаю, что этот код очень сырой и грязный - пожалуйста, извините меня за это.sub.StopReceiveMessages(); имеет код, который удаляет обработчик consumer.Received -= OnMessageRecieved.И, sub.StartReceiveMessages(); имеет код, который удаляет обработчик consumer.Received += OnMessageRecieved.Когда вы добавляете это обратно, я думал, что это будет работать как обычно.Но он больше не попадает в MessageReceived().Нужно ли снова вызывать BasicConsume, хотя я использую того же потребителя?Любая помощь будет признательна.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Client = RabbitMQ.Client;

namespace RabbitMQTest
{
    class Program
    {


        static void Main(string[] args)
        {
            MessageBusSubscription sub = new MessageBusSubscription();

            sub.Subscription("EmployeeDataChanged", "HR", "CompanyA", 5, 5000);
            sub.MessagesReceived += MessageReceived;
            Console.WriteLine("Press ESC to exit");
            while (!(Console.KeyAvailable && Console.ReadKey(true).Key == ConsoleKey.Escape))
            {
                // Simulating an event where we have to stop pulling the messages from the queue
                sub.StopReceiveMessages();

                Thread.Sleep(2000);

                // After some time, the issue is resolved and now we resume reading the messages from the queue
                sub.StartReceiveMessages();
            }
            sub.Dispose();
            Environment.Exit(0);
        }

        private static bool MessageReceived(string topic, string subscription, List<MessageContainer> messages)
        {
            List<MessageContainer> data = null;
            data = messages as List<MessageContainer>;

            foreach (var messageContainer in data)
            {
                // Do something with the message
                // Ack or Reject based on some logic
            }

            return true;
        }
    }
    public class MessageBusSubscription : IDisposable
    {
        #region variables
        Client.Events.EventingBasicConsumer consumer;
        Client.ConnectionFactory factory;        
        private System.Timers.Timer _timer = null;
        private Client.IModel _channel = null;
        private string _topic = string.Empty;
        private string _subscription = string.Empty;
        int batchCounter = 0;
        int batchSize = 0;
        ManualResetEvent _waitHandle = new ManualResetEvent(false);
        bool _disposing = false;
        bool _isSubscribed = false;
        List<MessageContainer> messages = new List<MessageContainer>();
        private object _processMessageLocker = new object();
        public event Func<string, string, List<MessageContainer>, bool> MessagesReceived;
        #endregion
        public MessageBusSubscription()
        {
            Client.IConnection conn = GetConnection();
            _channel = conn.CreateModel();
        }
        public void Subscription(string exchangeName, string queueName, string routingKey, int batchSize, double batchInterval)
        {
            _topic = exchangeName;
            _subscription = queueName;
            DeclareExchangeAndQueue(exchangeName, queueName, routingKey);

            if (batchInterval > 0 && batchSize > 1)
            {
                _timer = new System.Timers.Timer(batchInterval);

                _timer.Elapsed += (o, e) => {
                    ProcessMessagesReceived(exchangeName, queueName, true);
                };
            }
            Subscribe(routingKey, exchangeName, queueName, batchSize, batchInterval);
        }
        public Task Subscribe(string routingKey, string topic, string subscription, int _batchSize, double batchInterval)
        {
            try
            {
                consumer = new Client.Events.EventingBasicConsumer(_channel);

                batchCounter = 0;
                batchSize = _batchSize;

                consumer.Received += OnMessageRecieved;
                _isSubscribed = true;

                //RabbitMQ PUSH implementation using RabbitMQ.Client library
                var t = Task.Factory.StartNew(() =>
                {
                    try
                    {
                        if (_timer != null)
                        {
                            _timer.Start();
                        }

                        var queueName = string.Join(".", routingKey, topic, subscription);

                        if (!_disposing)
                        {
                            _channel.BasicConsume(queueName, false, consumer);
                            _waitHandle.WaitOne();
                        }

                        if (_timer != null)
                        {
                            _timer.Stop();
                            _timer.Dispose();
                        }

                        if (_channel != null)
                        {
                            if (_channel.IsOpen)
                                _channel.Close();
                            _channel.Dispose();
                        }
                    }
                    catch (Exception ex)
                    {

                    }

                });

                return t;
            }
            catch (Exception ex)
            {
                var exTask = new Task(() => { throw new AggregateException(ex); });
                exTask.RunSynchronously();
                return exTask;
            }
        }
        public void OnMessageRecieved(Client.IBasicConsumer sender, Client.Events.BasicDeliverEventArgs e)
        {
            try
            {
                string sourceExchange = string.Empty;
                string sourceQueue = string.Empty;

                string body = Encoding.ASCII.GetString(e.Body);
                string routingKey = e.RoutingKey;
                ulong deliveryTag = e.DeliveryTag;
                sourceExchange = "";
                sourceQueue = "";
                MessageContainer msgContainer = new MessageContainer();
                msgContainer.Message = body;

                batchCounter++;
                msgContainer.DeliveryTag = deliveryTag;

                lock (_processMessageLocker)
                {
                    messages.Add(msgContainer);
                    ProcessMessagesReceived(_topic, _subscription, false);
                }
            }
            catch (Exception ex)
            {

            }

        }
        public void ProcessMessagesReceived(string topic, string subscription, bool hasTimerElapsed)
        {
            try
            {
                // if it's the last message in the batch, or the interval has elapsed
                if ((batchCounter % batchSize == 0 && messages.Count > 0) || (hasTimerElapsed && messages.Count > 0))
                {
                    if (_timer != null)
                    {
                        _timer.Stop();
                    }

                    lock (_processMessageLocker)
                    {
                        // process the message
                        if (!MessagesReceived(topic, subscription, messages))
                        {
                            throw new Exception("Message processing exception - look in the default error queue (broker)");
                        }
                        messages.Clear();
                    }

                    batchCounter = 0;
                    if (_timer != null)
                    {
                        _timer.Start();
                    }
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
        public Client.IConnection GetConnection()
        {
            factory = new Client.ConnectionFactory();

            factory.UserName = "guest";
            factory.Password = "guest";
            factory.VirtualHost = "/";
            factory.HostName = "localhost";
            factory.Protocol = Client.Protocols.AMQP_0_9_1;
            factory.Port = 5673;

            return factory.CreateConnection();
        }
        public void DeclareExchangeAndQueue(string exchangeName, string queueName, string routingKey)
        {
            using (var exchangeConn = factory.CreateConnection())
            using (Client.IModel channel = exchangeConn.CreateModel())
            {
                channel.ExchangeDeclare(exchangeName, Client.ExchangeType.Direct);

                var queue = String.Join(".", routingKey, exchangeName, queueName);
                channel.QueueDeclare(queue, false, false, false, null);
                channel.QueueBind(queue, exchangeName, routingKey, null);
            }
        }
        public void StartReceiveMessages()
        {
            if (_timer != null && !_isSubscribed)
            {
                _timer.Start();
            }

            if (consumer != null && !_isSubscribed)
            {
                consumer.Received += OnMessageRecieved;
                _isSubscribed = true;
            }            
        }
        public void StopReceiveMessages()
        {
            if (_timer != null)
            {
                _timer.Stop();
            }

            if (consumer != null)
            {
                consumer.Received -= OnMessageRecieved;
                _isSubscribed = false;
            }                        
        }
        public void Dispose()
        {
            _disposing = true;
            _waitHandle.Set();
            _waitHandle?.Dispose();
            _waitHandle = null;

            if (_timer != null)
            {
                _timer.Stop();
                _timer.Dispose();
            }
        }
    }
    public class MessageContainer
    {
        public ulong DeliveryTag { get; set; }
        public string Message { get; set; }
    }
}

1 Ответ

0 голосов
/ 28 февраля 2019

Не отмените подписку на событие Received, вместо этого используйте метод BasicCancel, чтобы прекратить потребление сообщений, затем снова используйте BasicConsume, чтобы начать потребление.

Большая часть кода синхронизации потокаа запуск потребителя в Task на самом деле не лучшая практика.Если вам нужна дополнительная помощь с этим кодом, сохраните его в git-репозитории или где-нибудь в gist и просмотрите официальный список рассылки.


ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает список рассылки rabbitmq-users и только иногда отвечает на вопросы в StackOverflow.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...