Я пытаюсь использовать MassTransit для обмена данными между запросами и ответами через очередь служебной шины Azure.Отправитель - это веб-приложение Azure, Consumer - служба Windows, установленная на локальном компьютере.
Все отлично работает, когда речь идет о небольших объемах сообщений.Однако, как только я начинаю отправлять более ~ 20 мсг / с, я вижу серьезные (1-2 с) задержки ответов от потребителя.Моя телеметрия говорит мне, что задержка происходит в тот момент, когда потребителю нужно получить сообщения из очереди.
Одна странная, но я думаю важная часть поведения: я вижу, что при текущей загрузке количество непрочитанных сообщений в очередина константе avg и его 25. Если я отправлю в 2 раза больше сообщений, чем я вижу на avg 50 сообщений в очереди.С задержками на стороне потребления я бы ожидал, что очередь вырастет, но она постоянна, так что это определенно что-то внутри кода, который душит соединение.
Краткая информация:
- Нетпроблемы с оборудованием на машине.CPU / Mem не высокий.
- Я пытался поиграть с конфигами UseConcurrencyLimit, MaxConcurrentCalls, PrefetchCount на стороне приемника.Это не помогло
- Мой код решения отправителя и потребителя находится рядом с классическими примерами.
Потребитель: .Net framework 4.7.2 и MassTransit.Azure.ServiceBus.Core 5.5.2
Вот мой класс слушателя со всей удаленной бизнес-логикой:
public class QueueListener
{
private IBusControl Bus { get; set; }
public QueueListener()
{
Bus = MassTransit.Bus.Factory.CreateUsingAzureServiceBus(serviceBusFactoryConfigurator =>
{
var host = serviceBusFactoryConfigurator.Host(SettingsHelper.AzureServiceBusConnectionString,
(config) =>
{
config.OperationTimeout = TimeSpan.FromSeconds(60);
config.TransportType = TransportType.AmqpWebSockets;
});
serviceBusFactoryConfigurator.ReceiveEndpoint(host, SettingsHelper.CouponQueryQueueName, e =>
{
e.Handler<JToken>(HandleMessage);
e.UseConcurrencyLimit(16);
e.MaxConcurrentCalls = 16;
e.PrefetchCount = 32;
});
serviceBusFactoryConfigurator.EnableBatchedOperations = true;
serviceBusFactoryConfigurator.DefaultMessageTimeToLive = TimeSpan.FromSeconds(60);
});
}
private async Task HandleMessage(ConsumeContext context)
{
await Task.Delay(800);
if (context.ExpirationTime > SystemDateTime.Now)
{
await context.RespondAsync(new CouponUsedList { CouponsUsed = new List<CouponCurrentUsed>() });
}
}
public Task LaunchAsync()
{
return Bus.StartAsync();
}
public Task StopAsync()
{
return Bus.StopAsync();
}
}