Я играл с PreFetch и пытался понять, почему PreFetch всегда установлен на 0 в интерфейсе управления для очереди. В интерфейсе управления RabbitMQ я вижу настроенную предварительную выборку по каналам, но не саму очередь. Я также заметил, что они зарегистрированы как «глобальные», а не «для каждого потребителя», но, судя по всему, я не могу найти настройки, чтобы изменить это в MassTransit, хотя я предполагаю, что у меня есть недопонимание о том, как это работает, и документы не помогли мне дать ELI5.
Это пример конфигурации:
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(
new Uri(busSettings.HostAddress),
h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});
cfg.ReceiveEndpoint(
host,
"TEST-QUEUE-PF",
ec =>
{
ec.Consumer<MyConsumer>(context);
ec.PrefetchCount = 50; // consumer specific
ec.UseConcurrencyLimit(1); // consumer specific
});
cfg.PrefetchCount = 100; // bus control specific
cfg.UseConcurrencyLimit(1); // bus control specific
});
Это создает следующую очередь:
И затем, глядя на канал, я вижу следующую информацию о предварительной выборке:
И если я смотрю на все каналы, я вижу следующее:
Я пытаюсь понять, к чему относится каждый из этих PrefetchCounts.
Для справки: у нас есть несколько многоядерных серверов, на которых работают потребители (например, Round-Robin или, более подходяще, "Hungry Hippo", поскольку меня не волнует равное распределение). Настройки по умолчанию для PrefetchCount и ConcurrencyLimit работают не очень хорошо, потому что у нашего потребителя достаточно много работы, и он перегружает сервер базы данных, что приводит к тайм-аутам. Я ищу способ настроить этих потребителей так, чтобы они этого не делали.
Это MassTransit 5.5.5, поскольку все, что нарушает интеграцию UseSerilog (), и я не могу найти легкий путь обновления. Erlang и RabbitMq сами по себе являются текущими версиями. Это модуль AutoFa c более подробно:
private class BusModule : Module
{
protected override void Load(ContainerBuilder builder)
{
builder.RegisterAssemblyTypes(GetType().Assembly).As<IConsumer>();
builder.Register(context =>
{
var busSettings = context.Resolve<BusSettings>();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(
new Uri(busSettings.HostAddress),
h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});
cfg.ReceiveEndpoint(
host,
$"TEST-QUEUE-GLOBAL", // shared queue name for all nodes
ec =>
{
ec.PrefetchCount = 50;
ec.UseConcurrencyLimit(2);
ec.Consumer<MyConsumer>(context);
ec.EnablePriority(5);
ec.UseRetry(retryConfig =>
{
retryConfig
.Intervals(new[] { 1, 2, 4, 8, 16, 32 }
.Select(t => TimeSpan.FromMinutes(t))
.ToArray());
retryConfig
.Handle<HttpRequestException>();
retryConfig
.Handle<SwaggerException>(ex => ex.IsRetryValid());
});
});
cfg.PrefetchCount = 100;
cfg.UseConcurrencyLimit(2);
cfg.UseSerilog();
var correlationIdProvider = context.Resolve<ICorrelationProvider>();
cfg.ConfigurePublish(x => x.UseExecute(sendContext =>
{
sendContext.CorrelationId =
sendContext.CorrelationId == Guid.Empty ?
correlationIdProvider.GetId() : sendContext.CorrelationId; // cascade
}));
});
return busControl;
})
.SingleInstance()
.As<IBusControl>()
.As<IBus>();
}
}