Восстановление ActiveMQ после потери соединения или любого исключения - PullRequest
0 голосов
/ 23 марта 2020

Итак, у нас есть лог c для чтения сообщений из очереди:

///<inheritdoc/>
        public void ReceiveFromQueue<T>(IMessageHandler<T> callbackHandler, string queuePrefix, string consumer) where T : class, IMessageBase
        {
            if (callbackHandler == null)
            {
                _logger.LogError($"{nameof(ReceiveFromQueue)} error: {nameof(callbackHandler)} is null!");
                throw new MessageBusExcepiton($"{nameof(callbackHandler)} is null!", null);
            }

            if (string.IsNullOrEmpty(queuePrefix))
            {
                _logger.LogError($"{nameof(ReceiveFromQueue)} error: {nameof(queuePrefix)} is null!");
                throw new MessageBusExcepiton($"{nameof(queuePrefix)} is Empty!", null);
            }

            if (string.IsNullOrEmpty(queuePrefix))
            {
                _logger.LogError($"{nameof(ReceiveFromQueue)} error: {nameof(consumer)} is null!");
                throw new MessageBusExcepiton($"{nameof(consumer)} is Empty!", null);
            }

            try
            {
                //CREATE CONNECTION
                Uri uri = new Uri(_settings.BrokerUri);
                _logger.LogDebug($"{nameof(ReceiveFromQueue)} : Trying to create ActiveMQ factory [{_settings.BrokerUri}]");
                ConnectionFactory factory = new ConnectionFactory(uri);

                IConnection connection = factory.CreateConnection(_settings.Username, _settings.Password);

                _logger.LogDebug($"{nameof(ReceiveFromQueue)} : Trying to create session to ActiveMQ Broker [{_settings.BrokerUri}] for user [{_settings.Username}]");
                ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);

                IQueue dest = session.GetQueue(GenerateQueueName(queuePrefix, consumer));
                IMessageConsumer messageConsumer = session.CreateConsumer(dest);

                _logger.LogDebug($"{nameof(ReceiveFromQueue)} : Trying to start connection to ActiveMQ Queue [{dest.QueueName}]");
                connection.Start();

                //Add message handler for consuming messages
                messageConsumer.Listener += new MessageListener((receivedMsg) =>
                {
                    _logger.LogDebug($"Message received on [{dest.QueueName}]", receivedMsg);

                    //try to cast
                    var msg = receivedMsg as IObjectMessage;
                    var body = msg?.Body as T;

                    //In case of unsucessful cast or empty message body will be null
                    if(body == null)
                    {
                        var pMessage = new PoisonMessageModel() { Message = receivedMsg, MessageId = Guid.NewGuid().ToString(), Reason = $"Unsuccessful cast to {typeof(T)}" };
                        _logger.LogDebug($"Message failed on [{dest.QueueName}] for Reason: [\"Unsuccessful cast to {typeof(T)}\"]", receivedMsg);

                        //add poison message to queue
                    }

                    try
                    {
                        if (callbackHandler.Handle(body))
                        {
                            msg.Acknowledge();
                            _logger.LogDebug($"Message {body.MessageId} acknowledged on [{dest.QueueName}]");
                        }

                    } catch(Exception e)
                    {
                        var pMessage = new PoisonMessageModel() { Message = receivedMsg, MessageId = Guid.NewGuid().ToString(), Reason = e.Message };
                        _logger.LogError($"Message handle failed on [{dest.QueueName}] for Reason: [{e.Message}]", e);

                        //add poison message to queue
                    }
                });
            }
            catch (NMSSecurityException exc)
            {
                _logger.LogError($"Error while communicating to ActiveMQ Service!", exc);
            }
            catch (Exception e)
            {
                _logger.LogError($"Error on {nameof(ReceiveFromQueue)}", e);
            }
        }

И наша цель - иметь непрерывного прослушивателя в очереди, который будет получать уведомления, когда в очереди есть некоторые сообщения. Мы используем отказоустойчивое соединение, поэтому я хочу узнать, достаточно ли этого, чтобы скрыть мерцание и сбой сети, чтобы этот слушатель не остановился? Как сделать правильное восстановление в случае каких-либо ошибок? Нужно ли снова регистрировать слушателя или какова наилучшая практика в этом случае? Это будет использоваться несколькими различными сервисами.

...