MassTransit servicebuse оптимизирует потребителя - PullRequest
1 голос
/ 01 июля 2019

Я пытаюсь использовать MassTransit для обмена данными между запросами и ответами через очередь служебной шины Azure.Отправитель - это веб-приложение Azure, Consumer - служба Windows, установленная на локальном компьютере.

Все отлично работает, когда речь идет о небольших объемах сообщений.Однако, как только я начинаю отправлять более ~ 20 мсг / с, я вижу серьезные (1-2 с) задержки ответов от потребителя.Моя телеметрия говорит мне, что задержка происходит в тот момент, когда потребителю нужно получить сообщения из очереди.

Одна странная, но я думаю важная часть поведения: я вижу, что при текущей загрузке количество непрочитанных сообщений в очередина константе avg и его 25. Если я отправлю в 2 раза больше сообщений, чем я вижу на avg 50 сообщений в очереди.С задержками на стороне потребления я бы ожидал, что очередь вырастет, но она постоянна, так что это определенно что-то внутри кода, который душит соединение.

Краткая информация:

  • Нетпроблемы с оборудованием на машине.CPU / Mem не высокий.
  • Я пытался поиграть с конфигами UseConcurrencyLimit, MaxConcurrentCalls, PrefetchCount на стороне приемника.Это не помогло
  • Мой код решения отправителя и потребителя находится рядом с классическими примерами.

Потребитель: .Net framework 4.7.2 и MassTransit.Azure.ServiceBus.Core 5.5.2

Вот мой класс слушателя со всей удаленной бизнес-логикой:

public class QueueListener
{
    private IBusControl Bus { get; set; }

    public QueueListener()
    {
        Bus = MassTransit.Bus.Factory.CreateUsingAzureServiceBus(serviceBusFactoryConfigurator =>
        {
            var host = serviceBusFactoryConfigurator.Host(SettingsHelper.AzureServiceBusConnectionString,
                (config) =>
                {
                    config.OperationTimeout = TimeSpan.FromSeconds(60);
                    config.TransportType = TransportType.AmqpWebSockets;
                });
            serviceBusFactoryConfigurator.ReceiveEndpoint(host, SettingsHelper.CouponQueryQueueName, e =>
            {
                e.Handler<JToken>(HandleMessage);

                e.UseConcurrencyLimit(16);
                e.MaxConcurrentCalls = 16;
                e.PrefetchCount = 32;
            });
            serviceBusFactoryConfigurator.EnableBatchedOperations = true;
            serviceBusFactoryConfigurator.DefaultMessageTimeToLive = TimeSpan.FromSeconds(60);
        });
    }

    private async Task HandleMessage(ConsumeContext context)
    {
        await Task.Delay(800);
        if (context.ExpirationTime > SystemDateTime.Now)
        {
            await context.RespondAsync(new CouponUsedList { CouponsUsed = new List<CouponCurrentUsed>() });
        }

    }

    public Task LaunchAsync()
    {
        return Bus.StartAsync();
    }

    public Task StopAsync()
    {
        return Bus.StopAsync();
    }
}

1 Ответ

0 голосов
/ 02 июля 2019

Кажется, что здесь, опять же, просто не хватало одного конфига. Весь код, который вы пишете в ReceiveEndpoint, конфигурирует очередь прослушивателя-потребителя, и конфигурации, которые вы предоставляете в CreateUsingAzureServiceBus, являются конфигурациями для очереди ответа потребителя.

Все, что мне было нужно, это добавить одну строку внутри конфигурации потребителя. Без этой конфигурации все предварительно выбранные сообщения обрабатываются постепенно

e.EnableBatchedOperations = true;
...