Итак, у нас есть лог c для чтения сообщений из очереди:
///<inheritdoc/>
public void ReceiveFromQueue<T>(IMessageHandler<T> callbackHandler, string queuePrefix, string consumer) where T : class, IMessageBase
{
if (callbackHandler == null)
{
_logger.LogError($"{nameof(ReceiveFromQueue)} error: {nameof(callbackHandler)} is null!");
throw new MessageBusExcepiton($"{nameof(callbackHandler)} is null!", null);
}
if (string.IsNullOrEmpty(queuePrefix))
{
_logger.LogError($"{nameof(ReceiveFromQueue)} error: {nameof(queuePrefix)} is null!");
throw new MessageBusExcepiton($"{nameof(queuePrefix)} is Empty!", null);
}
if (string.IsNullOrEmpty(queuePrefix))
{
_logger.LogError($"{nameof(ReceiveFromQueue)} error: {nameof(consumer)} is null!");
throw new MessageBusExcepiton($"{nameof(consumer)} is Empty!", null);
}
try
{
//CREATE CONNECTION
Uri uri = new Uri(_settings.BrokerUri);
_logger.LogDebug($"{nameof(ReceiveFromQueue)} : Trying to create ActiveMQ factory [{_settings.BrokerUri}]");
ConnectionFactory factory = new ConnectionFactory(uri);
IConnection connection = factory.CreateConnection(_settings.Username, _settings.Password);
_logger.LogDebug($"{nameof(ReceiveFromQueue)} : Trying to create session to ActiveMQ Broker [{_settings.BrokerUri}] for user [{_settings.Username}]");
ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
IQueue dest = session.GetQueue(GenerateQueueName(queuePrefix, consumer));
IMessageConsumer messageConsumer = session.CreateConsumer(dest);
_logger.LogDebug($"{nameof(ReceiveFromQueue)} : Trying to start connection to ActiveMQ Queue [{dest.QueueName}]");
connection.Start();
//Add message handler for consuming messages
messageConsumer.Listener += new MessageListener((receivedMsg) =>
{
_logger.LogDebug($"Message received on [{dest.QueueName}]", receivedMsg);
//try to cast
var msg = receivedMsg as IObjectMessage;
var body = msg?.Body as T;
//In case of unsucessful cast or empty message body will be null
if(body == null)
{
var pMessage = new PoisonMessageModel() { Message = receivedMsg, MessageId = Guid.NewGuid().ToString(), Reason = $"Unsuccessful cast to {typeof(T)}" };
_logger.LogDebug($"Message failed on [{dest.QueueName}] for Reason: [\"Unsuccessful cast to {typeof(T)}\"]", receivedMsg);
//add poison message to queue
}
try
{
if (callbackHandler.Handle(body))
{
msg.Acknowledge();
_logger.LogDebug($"Message {body.MessageId} acknowledged on [{dest.QueueName}]");
}
} catch(Exception e)
{
var pMessage = new PoisonMessageModel() { Message = receivedMsg, MessageId = Guid.NewGuid().ToString(), Reason = e.Message };
_logger.LogError($"Message handle failed on [{dest.QueueName}] for Reason: [{e.Message}]", e);
//add poison message to queue
}
});
}
catch (NMSSecurityException exc)
{
_logger.LogError($"Error while communicating to ActiveMQ Service!", exc);
}
catch (Exception e)
{
_logger.LogError($"Error on {nameof(ReceiveFromQueue)}", e);
}
}
И наша цель - иметь непрерывного прослушивателя в очереди, который будет получать уведомления, когда в очереди есть некоторые сообщения. Мы используем отказоустойчивое соединение, поэтому я хочу узнать, достаточно ли этого, чтобы скрыть мерцание и сбой сети, чтобы этот слушатель не остановился? Как сделать правильное восстановление в случае каких-либо ошибок? Нужно ли снова регистрировать слушателя или какова наилучшая практика в этом случае? Это будет использоваться несколькими различными сервисами.