Как написать диспетчер автоматической подписки EasyNetQ для поставщика услуг ASP.NET Core? - PullRequest
1 голос
/ 13 мая 2019

Таким образом, у автоматического подписчика EasyNetQ есть базовый диспетчер по умолчанию, который не может создавать классы получателей сообщений с конструкторами без параметров.

Чтобы увидеть это в действии, создайте получателя с требуемой зависимостью.Вы можете настроить свой собственный сервис или использовать ILogger<T>, который автоматически регистрируется по умолчанию в рамках фреймворка.

ConsumeTextMessage.cs

public class ConsumeTextMessage : IConsume<TextMessage>
{
    private readonly ILogger<ConsumeTextMessage> logger;

    public ConsumeTextMessage(ILogger<ConsumeTextMessage> logger)
    {
        this.logger = logger;
    }

    public void Consume(TextMessage message)
    {
        ...
    }
}

Подключитеавтоматический подписчик (здесь есть некоторая свобода в том, что касается того, где / когда написать / запустить этот код).

Startup.cs

public void ConfigureServices(IServiceCollection services)
{
    services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
}

(Где-тоиначе, возможно startup.Configure или BackgroundService)

var subscriber = new AutoSubscriber(bus, "example");
subscriber.Subscribe(Assembly.GetExecutingAssembly());

Теперь, запустите программу и опубликуйте несколько сообщений, вы должны увидеть, что каждое сообщение попадает в очередь ошибок по умолчанию.

System.MissingMethodException: No parameterless constructor defined for this object.
   at System.RuntimeTypeHandle.CreateInstance(RuntimeType type, Boolean publicOnly, Boolean wrapExceptions, Boolean& canBeCached, RuntimeMethodHandleInternal& ctor)
   at System.RuntimeType.CreateInstanceSlow(Boolean publicOnly, Boolean wrapExceptions, Boolean skipCheckThis, Boolean fillCache)
   at System.Activator.CreateInstance[T]()
   at EasyNetQ.AutoSubscribe.DefaultAutoSubscriberMessageDispatcher.DispatchAsync[TMessage,TAsyncConsumer](TMessage message)
   at EasyNetQ.Consumer.HandlerRunner.InvokeUserMessageHandlerInternalAsync(ConsumerExecutionContext context)

Я знаю, что могу предоставить своего собственного диспетчера , но как нам добиться этого, работая с поставщиком услуг ASP.NET Core;обеспечение того, чтобы это работало с сервисами с ограниченным доступом?

1 Ответ

1 голос
/ 14 мая 2019

Итак, вот что я придумал.

public class MessageDispatcher : IAutoSubscriberMessageDispatcher
{
    private readonly IServiceProvider provider;

    public MessageDispatcher(IServiceProvider provider)
    {
        this.provider = provider;
    }

    public void Dispatch<TMessage, TConsumer>(TMessage message)
        where TMessage : class
        where TConsumer : class, IConsume<TMessage>
    {
        using(var scope = provider.CreateScope())
        {
            var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
            consumer.Consume(message);
        }
    }

    public async Task DispatchAsync<TMessage, TConsumer>(TMessage message)
        where TMessage : class
        where TConsumer : class, IConsumeAsync<TMessage>
    {
        using(var scope = provider.CreateScope())
        {
            var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
            await consumer.ConsumeAsync(message);
        }
    }
}

Несколько заметных моментов ...

Зависимость IServiceProvider - это контейнер DI ASP.NET Core.Поначалу это может быть неясно, потому что на протяжении Startup.ConfigureServices() вы регистрируете типы с использованием другого интерфейса, IServiceCollection.

public MessageDispatcher(IServiceProvider provider)
{
    this.provider = provider;
}

Чтобы разрешить сервисы с определенными областями, вам необходимо создавать и управлять ими.жизненный цикл области вокруг создания и использования потребителя.И я использую метод расширения GetRequiredService<T>, потому что мне действительно нужно неприятное исключение, а не пустая ссылка, которая может просочиться на некоторое время, прежде чем мы заметим это (в форме исключения нулевой ссылки).

using(var scope = provider.CreateScope())
{
    var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
    consumer.Consume(message);
}

Если вы использовали только provider напрямую, как в provider.GetRequiredService<T>(), вы увидите ошибку, подобную этой, при попытке разрешить ограниченного потребителя или зависимую область для потребителя.

Возникло исключение: «System.InvalidOperationException» в Microsoft.Extensions.DependencyInjection.dll: «Не удается разрешить службу с областью действия» Example.Messages.ConsumeTextMessage 'от корневого поставщика.'

Для разрешения области действияобслуживая и поддерживая их жизненный цикл должным образом для асинхронных потребителей, вам нужно получить ключевые слова async / await в нужном месте.Вам следует дождаться вызова ConsumeAsync, который требует, чтобы метод был асинхронным.Используйте точки останова на линии ожидания и у вашего потребителя и построчно, чтобы лучше справиться с этим!

public async Task DispatchAsync<TMessage, TConsumer>(TMessage message)
    where TMessage : class
    where TConsumer : class, IConsumeAsync<TMessage>
{
    using(var scope = provider.CreateScope())
    {
        var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
        await consumer.ConsumeAsync(message);
    }
}

ОК, теперь, когда у нас есть диспетчер, нам просто нужно все настроитьправильно в автозагрузке.Нам нужно разрешить диспетчеру от провайдера, чтобы провайдер мог предоставить себя должным образом.Это просто один способ сделать это.

Startup.cs

public void ConfigureServices(IServiceCollection services)
{
    // messaging
    services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
    services.AddSingleton<MessageDispatcher>();
    services.AddSingleton<AutoSubscriber>(provider =>
    {
        var subscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "example")
        {
            AutoSubscriberMessageDispatcher = provider.GetRequiredService<MessageDispatcher>();
        }
    });

    // message handlers
    services.AddScoped<ConsumeTextMessage>();
}

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
    app.ApplicationServices.GetRequiredServices<AutoSubscriber>().SubscribeAsync(Assembly.GetExecutingAssembly());
}
...