MassTransit 5.2, SignalR: Как я могу получить IHubContext внутри моего потребителя? - PullRequest
0 голосов
/ 14 января 2019

Моя главная проблема - получить правильный экземпляр концентратора SignalR.

Контекст: я создаю веб-приложение, которое взаимодействует с несколькими внешними системами. Операции CRUD в моем приложении приводят к обновлению баз данных внешних систем.

В этом примере у меня работает 3 службы:

ExternalSystem | StateMachine | .NET CORE WebAPI

Когда я публикую форму «создать сотрудника», сообщение RabbitMQ будет отправлено из WebAPI в машину состояний. Затем машина состояний отправляет пару сообщений о создании моей внешней системной службе, которая обновляет базу данных. После этого он обновляет машину состояний для отслеживания операции create.

Форма -> API -> StateMachine -> ExternalSystem -> StateMachine -> API

Пока все хорошо. Теперь я хотел бы использовать SignalR для отправки обновлений статуса клиенту. Итак, я реализовал этот потребитель в API:

public class UpdatesConsumer :
    IConsumer<IExternalSystemUpdateMessage>
{
    private readonly IHubContext<UpdatesHub> _updaterHubContext;

    public UpdatesConsumer(IHubContext<UpdatesHub> hubContext)
    {
        _updaterHubContext = hubContext;
    }

    public Task Consume(ConsumeContext<IExternalSystemUpdateMessage> context)
    {
        //return _updaterHubContext.Clients.Group(context.Message.CorrelationId.ToString()).SendAsync("SEND_UPDATE", context.Message.Message);// this.SendUpdate(context.Message.CorrelationId, context.Message.Message);
        return _updaterHubContext.Clients.All.SendAsync("SEND_UPDATE", context.Message.Message);
    }
}

Это мой SignalR хаб:

public class UpdatesHub :
    Hub
{
    public Task SendUpdate(Guid correlationId, string message)
    {
        return Clients.Group(correlationId.ToString()).SendAsync("SEND_UPDATE", message);
    }
}

А вот как создаются шины и потребители:

    public void ConfigureServices(IServiceCollection services)
    {
        _services = services;

        services.AddMvc();
        services.AddSignalR();            
        //services.AddSingleton<IHubContext<UpdatesHub>>();

        WebAPI.CreateBus();
    }

    public static IServiceCollection _services;

    static IBusControl _busControl;
    public static IBusControl Bus
    {
        get
        {
            return _busControl;
        }
    }

    public static void CreateBus()
    {
        IRMQConnection rmqSettings = Config.GetRMQConnectionConfig("rmq-settings.json", "connection");

        _busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(x =>
        {
            var host = x.Host(BusInitializer.GetUri("", rmqSettings), h =>
            {
                h.Username(rmqSettings.UserName);
                h.Password(rmqSettings.Password);
            });

            x.ReceiveEndpoint(host, "externalsystems.update",
                e => { e.Consumer(() => new UpdatesConsumer((IHubContext<UpdatesHub>)Startup.__serviceProvider.GetService(typeof(IHubContext<UpdatesHub>)))); });
        });

        TaskUtil.Await(() => _busControl.StartAsync());
    }

=============================================== ==========================

Так что проблема в том, что _updaterHubContext.Clients в моем классе Consumer всегда оказываются пустыми. Я проверил доступ к концентратору в контроллере, и клиенты действительно появляются:

public class TestController : Controller
{
    private readonly IHubContext<UpdatesHub> _hubContext;
    public TestController(IHubContext<UpdatesHub> hubContext)
    {
        _hubContext = hubContext;
    }

    [HttpGet]
    [Route("api/Test/")]
    public IActionResult Index()
    {
        return View();
    }
}

Как я могу получить правильный экземпляр концентратора в моем классе Consumer? Или как я могу получить доступ к коллекции IServiceCollection, которую использует .net?

Спасибо заранее!

Ответы [ 2 ]

0 голосов
/ 14 января 2019

Вы можете зарегистрировать своего потребителя, чтобы MassTransit разрешил его из IServiceProvider, используя поддержку, предоставляемую в пакете MassTransit.Extensions.DependencyInjection.

x.ReceiveEndpoint(host, "externalsystems.update", e => 
{
    e.Consumer<UpdatesConsumer>(_serviceProvider);
});

Обязательно зарегистрируйте свой UpdatesConsumer в контейнере. Это должно разрешить новый экземпляр получателя для каждого сообщения, полученного в конечной точке.

0 голосов
/ 14 января 2019

Почему бы не зарегистрировать шину, используя Microsoft Dependency Injection . Это должно решить вашу проблему, это разрешит вашего потребителя, используя IServiceProvider

...