Я пытаюсь провести интеграционный тест некоторого кода RabbitMQ.Прямо сейчас у меня есть класс RabbitMqConsumer
с методом BeginConsuming()
, который выглядит следующим образом:
public void BeginConsuming()
{
var messageConsumer = new EventingBasicConsumer(ConsumerChannel);
//Consumes messages from queue
messageConsumer.Received += (model, eventArgs) =>
{
var message = Encoding.UTF8.GetString(eventArgs.Body);
HandleMessage(message);
// Send a message to the stats service to recalculate the stats when each game ends
if (GameplayMessageHandler.GetMessageType(message) == "gameEnd")
{
var jsonMsgObject = (JObject)JsonConvert.DeserializeObject(message);
var gameGuid = (Guid) jsonMsgObject["game_guid"];
SendPostGameStatsEventToStatsService(gameGuid);
}
// After *everything* is done for processing this message, fire the message handled event
MessageHandled?.Invoke();
};
ConsumerChannel.BasicConsume(
queue: _rabbitSettings.GamesQueueName,
autoAck: true,
consumer: messageConsumer);
Log.Logger.Information("Now consuming RabbitMq messages in channel: {queueName}", _rabbitSettings.GamesQueueName);
}
Я объявил следующее событие / delgate в классе:
public delegate void OnMessageHandledListener();
public event OnMessageHandledListener MessageHandled;
Идея состоит в том, чтобы вызвать это событие, когда RabbitMQ завершит обработку каждого сообщения.Затем я могу написать тест, который выглядит следующим образом:
model.BasicPublish("", "GameEvents", false, null, msgBytes);
Assert.True(fakeRabbitServer.Queues["GameEvents"].Messages.Count.Equals(1));
gameEventsRabbitConsumer.MessageHandled += () =>
{
Assert.True(fakeRabbitServer.Queues["GameEvents"].Messages.Count.Equals(0));
};
gameEventsRabbitConsumer.BeginConsuming();
Однако этот тест не выполняется в строке Assert.True(fakeRabbitServer.Queues["GameEvents"].Messages.Count.Equals(0));
Почему моя очередь не очищается в этот момент?Это ошибка в том, как я обращаюсь с асинхронной природой вещей?Как я могу это исправить?