Итак, вот что я придумал.
public class MessageDispatcher : IAutoSubscriberMessageDispatcher
{
private readonly IServiceProvider provider;
public MessageDispatcher(IServiceProvider provider)
{
this.provider = provider;
}
public void Dispatch<TMessage, TConsumer>(TMessage message)
where TMessage : class
where TConsumer : class, IConsume<TMessage>
{
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
consumer.Consume(message);
}
}
public async Task DispatchAsync<TMessage, TConsumer>(TMessage message)
where TMessage : class
where TConsumer : class, IConsumeAsync<TMessage>
{
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
await consumer.ConsumeAsync(message);
}
}
}
Несколько заметных моментов ...
Зависимость IServiceProvider
- это контейнер DI ASP.NET Core.Поначалу это может быть неясно, потому что на протяжении Startup.ConfigureServices()
вы регистрируете типы с использованием другого интерфейса, IServiceCollection
.
public MessageDispatcher(IServiceProvider provider)
{
this.provider = provider;
}
Чтобы разрешить сервисы с определенными областями, вам необходимо создавать и управлять ими.жизненный цикл области вокруг создания и использования потребителя.И я использую метод расширения GetRequiredService<T>
, потому что мне действительно нужно неприятное исключение, а не пустая ссылка, которая может просочиться на некоторое время, прежде чем мы заметим это (в форме исключения нулевой ссылки).
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
consumer.Consume(message);
}
Если вы использовали только provider
напрямую, как в provider.GetRequiredService<T>()
, вы увидите ошибку, подобную этой, при попытке разрешить ограниченного потребителя или зависимую область для потребителя.
Возникло исключение: «System.InvalidOperationException» в Microsoft.Extensions.DependencyInjection.dll: «Не удается разрешить службу с областью действия» Example.Messages.ConsumeTextMessage 'от корневого поставщика.'
Для разрешения области действияобслуживая и поддерживая их жизненный цикл должным образом для асинхронных потребителей, вам нужно получить ключевые слова async / await в нужном месте.Вам следует дождаться вызова ConsumeAsync
, который требует, чтобы метод был асинхронным.Используйте точки останова на линии ожидания и у вашего потребителя и построчно, чтобы лучше справиться с этим!
public async Task DispatchAsync<TMessage, TConsumer>(TMessage message)
where TMessage : class
where TConsumer : class, IConsumeAsync<TMessage>
{
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
await consumer.ConsumeAsync(message);
}
}
ОК, теперь, когда у нас есть диспетчер, нам просто нужно все настроитьправильно в автозагрузке.Нам нужно разрешить диспетчеру от провайдера, чтобы провайдер мог предоставить себя должным образом.Это просто один способ сделать это.
Startup.cs
public void ConfigureServices(IServiceCollection services)
{
// messaging
services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
services.AddSingleton<MessageDispatcher>();
services.AddSingleton<AutoSubscriber>(provider =>
{
var subscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "example")
{
AutoSubscriberMessageDispatcher = provider.GetRequiredService<MessageDispatcher>();
}
});
// message handlers
services.AddScoped<ConsumeTextMessage>();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.ApplicationServices.GetRequiredServices<AutoSubscriber>().SubscribeAsync(Assembly.GetExecutingAssembly());
}