MassTransit - объяснение PrefetchCount и нескольких каналов для одного потребителя - PullRequest
1 голос
/ 29 апреля 2020

Я играл с PreFetch и пытался понять, почему PreFetch всегда установлен на 0 в интерфейсе управления для очереди. В интерфейсе управления RabbitMQ я вижу настроенную предварительную выборку по каналам, но не саму очередь. Я также заметил, что они зарегистрированы как «глобальные», а не «для каждого потребителя», но, судя по всему, я не могу найти настройки, чтобы изменить это в MassTransit, хотя я предполагаю, что у меня есть недопонимание о том, как это работает, и документы не помогли мне дать ELI5.

Это пример конфигурации:

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
   var host = cfg.Host(
        new Uri(busSettings.HostAddress),
        h =>
        {
            h.Username(busSettings.Username);
            h.Password(busSettings.Password);
        });

    cfg.ReceiveEndpoint(
        host,
        "TEST-QUEUE-PF",
        ec =>
        {
            ec.Consumer<MyConsumer>(context);
            ec.PrefetchCount = 50; // consumer specific
            ec.UseConcurrencyLimit(1); // consumer specific
        });

    cfg.PrefetchCount = 100; // bus control specific
    cfg.UseConcurrencyLimit(1); // bus control specific
});

Это создает следующую очередь:

Queue

И затем, глядя на канал, я вижу следующую информацию о предварительной выборке:

enter image description here

И если я смотрю на все каналы, я вижу следующее:

enter image description here

Я пытаюсь понять, к чему относится каждый из этих PrefetchCounts.

Для справки: у нас есть несколько многоядерных серверов, на которых работают потребители (например, Round-Robin или, более подходяще, "Hungry Hippo", поскольку меня не волнует равное распределение). Настройки по умолчанию для PrefetchCount и ConcurrencyLimit работают не очень хорошо, потому что у нашего потребителя достаточно много работы, и он перегружает сервер базы данных, что приводит к тайм-аутам. Я ищу способ настроить этих потребителей так, чтобы они этого не делали.

Это MassTransit 5.5.5, поскольку все, что нарушает интеграцию UseSerilog (), и я не могу найти легкий путь обновления. Erlang и RabbitMq сами по себе являются текущими версиями. Это модуль AutoFa c более подробно:

private class BusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        builder.RegisterAssemblyTypes(GetType().Assembly).As<IConsumer>();
        builder.Register(context =>
        {
            var busSettings = context.Resolve<BusSettings>();
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(
                    new Uri(busSettings.HostAddress),
                    h =>
                    {
                        h.Username(busSettings.Username);
                        h.Password(busSettings.Password);
                    });

                cfg.ReceiveEndpoint(
                    host,
                    $"TEST-QUEUE-GLOBAL", // shared queue name for all nodes
                    ec =>
                    {
                        ec.PrefetchCount = 50;
                        ec.UseConcurrencyLimit(2);
                        ec.Consumer<MyConsumer>(context);
                        ec.EnablePriority(5);
                        ec.UseRetry(retryConfig =>
                        {
                            retryConfig
                                .Intervals(new[] { 1, 2, 4, 8, 16, 32 }
                                .Select(t => TimeSpan.FromMinutes(t))
                                .ToArray());
                            retryConfig
                                .Handle<HttpRequestException>();
                            retryConfig
                                .Handle<SwaggerException>(ex => ex.IsRetryValid());
                        });
                    });

                cfg.PrefetchCount = 100;
                cfg.UseConcurrencyLimit(2);
                cfg.UseSerilog();

                var correlationIdProvider = context.Resolve<ICorrelationProvider>();
                cfg.ConfigurePublish(x => x.UseExecute(sendContext =>
                {
                    sendContext.CorrelationId = 
                        sendContext.CorrelationId == Guid.Empty ? 
                            correlationIdProvider.GetId() : sendContext.CorrelationId; // cascade
                }));
            });

            return busControl;
        })
        .SingleInstance()
        .As<IBusControl>()
        .As<IBus>();
    }
}

1 Ответ

1 голос
/ 29 апреля 2020

Во-первых, я предполагаю, что вы используете более старую версию MassTransit, поскольку был сделан переход, чтобы отойти от глобальной предварительной выборки, начиная с v6.

Во-вторых, большое количество предварительных выборок в сочетании с лимитом параллелизма значение 1 приведет к тому, что сообщения (prefetchcount - 1) будут находиться на конечной точке приема, ожидая обработки, пока обрабатывается 1 сообщение. Таким образом, если имеется только 50 сообщений, первый узел может получить их все, а затем остальные узлы простаивают, поскольку сообщения ожидают на одном узле, чтобы преодолеть узкое место.

Текущая версия RabbitMQ Консоль управления с предварительной выборкой канала показана ниже:

RabbitMQ Prefetch Count

Поскольку MassTransit выводит на канал только одного потребителя, предыдущий подход существенно ограничивал потребителя в глобальную предварительную выборку канала, но теперь это более явно. Кроме того, новый параметр работает с очередями кворума, которые не поддерживают глобальную настройку предварительной выборки.

Если вы перегружаете базу данных и уже оптимизировали запрос к базе данных, чтобы избежать блокировки / блокировки, и вам необходимо уменьшите поток, уменьшите предварительную выборку, чтобы приблизиться к чему-то вроде 140% от вашего ограничения параллелизма. Итак, если серьезно, если вы на 1, установите предварительную выборку равной 2.

...