Я работаю с 2 консольными приложениями .NET Core в сценарии производитель / потребитель с MassTransit / RabbitMQ. Я должен убедиться, что даже если ни один потребитель не запущен, сообщения от производителя все еще успешно помещаются в очередь. Похоже, что это не работает с Publish () - сообщения просто исчезли, поэтому я использую Send (). Сообщения по крайней мере помещаются в очередь, но без каких-либо потребителей, выполняющих сообщения, все попадают в очередь «_skipped».
Итак, это мой первый вопрос : это правильный подход, основанный на требовании (даже если НЕТ потребителей работают, сообщения от производителя по-прежнему успешно ставятся в очередь)?
С Send () мой потребитель действительно работает, но все же многие сообщения проваливаются и попадают в очередь "_skipped". Логика потребителя минимальна (просто регистрирует сообщение в данный момент), поэтому это не длительный процесс.
Итак, это мой второй вопрос : почему так много сообщений все еще выбрасывается в очередь "_skipped"?
И это приводит к моему третьему вопросу : означает ли это, что мой потребитель также должен прослушивать очередь "_skipped"?
Я не уверен, какой код вам нужен для ответа на этот вопрос, но вот скриншот из пользовательского интерфейса управления RabbitMQ:
![RabbitMQ queues](https://i.stack.imgur.com/UnXnB.png)
Конфигурация производителя:
static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder()
.ConfigureServices((hostContext, services) =>
{
services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));
services.AddMassTransit(cfg =>
{
cfg.AddBus(ConfigureBus);
});
services.AddHostedService<CardMessageProducer>();
})
.UseConsoleLifetime()
.UseSerilog();
}
static IBusControl ConfigureBus(IServiceProvider provider)
{
var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
{
h.Username(options.RabbitMQ_Username);
h.Password(options.RabbitMQ_Password);
});
cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
{
EndpointConvention.Map<CardMessage>(e.InputAddress);
});
});
}
Код производителя:
Bus.Send(message);
Конфигурация потребителя:
static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder()
.ConfigureServices((hostContext, services) =>
{
services.AddSingleton<CardMessageConsumer>();
services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));
services.AddMassTransit(cfg =>
{
cfg.AddBus(ConfigureBus);
});
services.AddHostedService<MassTransitHostedService>();
})
.UseConsoleLifetime()
.UseSerilog();
}
static IBusControl ConfigureBus(IServiceProvider provider)
{
var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
{
h.Username(options.RabbitMQ_Username);
h.Password(options.RabbitMQ_Password);
});
cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
{
e.Consumer<CardMessageConsumer>(provider);
});
//cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName + "_skipped", e =>
//{
// e.Consumer<CardMessageConsumer>(provider);
//});
});
}
Код потребителя:
class CardMessageConsumer : IConsumer<CardMessage>
{
private readonly ILogger<CardMessageConsumer> logger;
private readonly ApplicationConfiguration configuration;
private long counter;
public CardMessageConsumer(ILogger<CardMessageConsumer> logger, IOptions<ApplicationConfiguration> options)
{
this.logger = logger;
this.configuration = options.Value;
}
public async Task Consume(ConsumeContext<CardMessage> context)
{
this.counter++;
this.logger.LogTrace($"Message #{this.counter} consumed: {context.Message}");
}
}