Я использую RabbitMQ.Client (C #) для работы с RabbitMQ.У меня проблемы с извлечением сообщений из очереди после удаления и повторного добавления обработчика событий полученных сообщений.consumer.Received -= OnMessageRecieved;
У меня сложная система, где служба Windows подписывается на очереди RabbitMQ и обрабатывает сообщения.Существует несколько потоков, выполняемых для разных целей: таймер для вызова PUSH API, другой таймер для выполнения API-аутентификации и т. Д. Если аутентификация API-интерфейса не проходит, мы не хотим обрабатывать сообщения из очереди.Мы хотим держать сообщения в состоянии готовности.Только, когда аутентификация API успешна, мы хотим обрабатывать сообщения.Таким образом, в случае сбоя мы удаляем обработчик события и в случае успеха добавляем его обратно.Когда мы это делаем, обработчик событий успешно добавляется, но теперь сообщения не извлекаются из очереди.
Чтобы смоделировать это, я создал консольное приложение.Я написал это менее чем за час, я знаю, что этот код очень сырой и грязный - пожалуйста, извините меня за это.sub.StopReceiveMessages();
имеет код, который удаляет обработчик consumer.Received -= OnMessageRecieved
.И, sub.StartReceiveMessages();
имеет код, который удаляет обработчик consumer.Received += OnMessageRecieved
.Когда вы добавляете это обратно, я думал, что это будет работать как обычно.Но он больше не попадает в MessageReceived()
.Нужно ли снова вызывать BasicConsume, хотя я использую того же потребителя?Любая помощь будет признательна.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Client = RabbitMQ.Client;
namespace RabbitMQTest
{
class Program
{
static void Main(string[] args)
{
MessageBusSubscription sub = new MessageBusSubscription();
sub.Subscription("EmployeeDataChanged", "HR", "CompanyA", 5, 5000);
sub.MessagesReceived += MessageReceived;
Console.WriteLine("Press ESC to exit");
while (!(Console.KeyAvailable && Console.ReadKey(true).Key == ConsoleKey.Escape))
{
// Simulating an event where we have to stop pulling the messages from the queue
sub.StopReceiveMessages();
Thread.Sleep(2000);
// After some time, the issue is resolved and now we resume reading the messages from the queue
sub.StartReceiveMessages();
}
sub.Dispose();
Environment.Exit(0);
}
private static bool MessageReceived(string topic, string subscription, List<MessageContainer> messages)
{
List<MessageContainer> data = null;
data = messages as List<MessageContainer>;
foreach (var messageContainer in data)
{
// Do something with the message
// Ack or Reject based on some logic
}
return true;
}
}
public class MessageBusSubscription : IDisposable
{
#region variables
Client.Events.EventingBasicConsumer consumer;
Client.ConnectionFactory factory;
private System.Timers.Timer _timer = null;
private Client.IModel _channel = null;
private string _topic = string.Empty;
private string _subscription = string.Empty;
int batchCounter = 0;
int batchSize = 0;
ManualResetEvent _waitHandle = new ManualResetEvent(false);
bool _disposing = false;
bool _isSubscribed = false;
List<MessageContainer> messages = new List<MessageContainer>();
private object _processMessageLocker = new object();
public event Func<string, string, List<MessageContainer>, bool> MessagesReceived;
#endregion
public MessageBusSubscription()
{
Client.IConnection conn = GetConnection();
_channel = conn.CreateModel();
}
public void Subscription(string exchangeName, string queueName, string routingKey, int batchSize, double batchInterval)
{
_topic = exchangeName;
_subscription = queueName;
DeclareExchangeAndQueue(exchangeName, queueName, routingKey);
if (batchInterval > 0 && batchSize > 1)
{
_timer = new System.Timers.Timer(batchInterval);
_timer.Elapsed += (o, e) => {
ProcessMessagesReceived(exchangeName, queueName, true);
};
}
Subscribe(routingKey, exchangeName, queueName, batchSize, batchInterval);
}
public Task Subscribe(string routingKey, string topic, string subscription, int _batchSize, double batchInterval)
{
try
{
consumer = new Client.Events.EventingBasicConsumer(_channel);
batchCounter = 0;
batchSize = _batchSize;
consumer.Received += OnMessageRecieved;
_isSubscribed = true;
//RabbitMQ PUSH implementation using RabbitMQ.Client library
var t = Task.Factory.StartNew(() =>
{
try
{
if (_timer != null)
{
_timer.Start();
}
var queueName = string.Join(".", routingKey, topic, subscription);
if (!_disposing)
{
_channel.BasicConsume(queueName, false, consumer);
_waitHandle.WaitOne();
}
if (_timer != null)
{
_timer.Stop();
_timer.Dispose();
}
if (_channel != null)
{
if (_channel.IsOpen)
_channel.Close();
_channel.Dispose();
}
}
catch (Exception ex)
{
}
});
return t;
}
catch (Exception ex)
{
var exTask = new Task(() => { throw new AggregateException(ex); });
exTask.RunSynchronously();
return exTask;
}
}
public void OnMessageRecieved(Client.IBasicConsumer sender, Client.Events.BasicDeliverEventArgs e)
{
try
{
string sourceExchange = string.Empty;
string sourceQueue = string.Empty;
string body = Encoding.ASCII.GetString(e.Body);
string routingKey = e.RoutingKey;
ulong deliveryTag = e.DeliveryTag;
sourceExchange = "";
sourceQueue = "";
MessageContainer msgContainer = new MessageContainer();
msgContainer.Message = body;
batchCounter++;
msgContainer.DeliveryTag = deliveryTag;
lock (_processMessageLocker)
{
messages.Add(msgContainer);
ProcessMessagesReceived(_topic, _subscription, false);
}
}
catch (Exception ex)
{
}
}
public void ProcessMessagesReceived(string topic, string subscription, bool hasTimerElapsed)
{
try
{
// if it's the last message in the batch, or the interval has elapsed
if ((batchCounter % batchSize == 0 && messages.Count > 0) || (hasTimerElapsed && messages.Count > 0))
{
if (_timer != null)
{
_timer.Stop();
}
lock (_processMessageLocker)
{
// process the message
if (!MessagesReceived(topic, subscription, messages))
{
throw new Exception("Message processing exception - look in the default error queue (broker)");
}
messages.Clear();
}
batchCounter = 0;
if (_timer != null)
{
_timer.Start();
}
}
}
catch (Exception ex)
{
throw ex;
}
}
public Client.IConnection GetConnection()
{
factory = new Client.ConnectionFactory();
factory.UserName = "guest";
factory.Password = "guest";
factory.VirtualHost = "/";
factory.HostName = "localhost";
factory.Protocol = Client.Protocols.AMQP_0_9_1;
factory.Port = 5673;
return factory.CreateConnection();
}
public void DeclareExchangeAndQueue(string exchangeName, string queueName, string routingKey)
{
using (var exchangeConn = factory.CreateConnection())
using (Client.IModel channel = exchangeConn.CreateModel())
{
channel.ExchangeDeclare(exchangeName, Client.ExchangeType.Direct);
var queue = String.Join(".", routingKey, exchangeName, queueName);
channel.QueueDeclare(queue, false, false, false, null);
channel.QueueBind(queue, exchangeName, routingKey, null);
}
}
public void StartReceiveMessages()
{
if (_timer != null && !_isSubscribed)
{
_timer.Start();
}
if (consumer != null && !_isSubscribed)
{
consumer.Received += OnMessageRecieved;
_isSubscribed = true;
}
}
public void StopReceiveMessages()
{
if (_timer != null)
{
_timer.Stop();
}
if (consumer != null)
{
consumer.Received -= OnMessageRecieved;
_isSubscribed = false;
}
}
public void Dispose()
{
_disposing = true;
_waitHandle.Set();
_waitHandle?.Dispose();
_waitHandle = null;
if (_timer != null)
{
_timer.Stop();
_timer.Dispose();
}
}
}
public class MessageContainer
{
public ulong DeliveryTag { get; set; }
public string Message { get; set; }
}
}