Masstransit добавить потребителя в контейнер, указав привязку очереди - PullRequest
0 голосов
/ 09 марта 2020

У меня следующий тип потребителя:

internal class ObjectAddedHandler : IConsumer<ObjectAddedIntegrationEvent>
{
    public async Task Consume(ConsumeContext<ObjectAddedIntegrationEvent> context)
    {
        var @event = context.Message;
        await HandleAsync(@event).ConfigureAwait(false);
    }
}

, который зарегистрирован в моем контейнере через:

container.AddMassTransit(x =>
{
    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host(configurationProvider.RabbitMQHostName, hostConfigurator =>
        {
            hostConfigurator.Username(configurationProvider.RabbitMQUsername);
            hostConfigurator.Password(configurationProvider.RabbitMQPassword);

            hostConfigurator.UseCluster(c =>
            {
                string[] hostnames = configurationProvider.RabbitMQNodes.Split(';');
                c.ClusterMembers = hostnames;
            });
        });

        host.Settings.GetConnectionFactory().Endpoint.AddressFamily = AddressFamily.InterNetwork;

        /*HERE*/ x.AddConsumer<ObjectAddedHandler>().Endpoint(e => e.Name = "ObjectAddedHandler "+configurationProvider.TenantName);

        cfg.ConfigureEndpoints(container);
    }));

});

, однако, следуя документации Я бы нравится устанавливать прямой обмен для использования ключей маршрутизации. В документации нигде не удалось найти способ добавления потребителя таким же образом, как я, и в то же время установить свойства привязки конечной точки, как указано в документации.

Когда я пытаюсь получить доступ к конечной точке в добавив потребителя, я могу только изменить имя, количество предварительных выборок и еще пару свойств, но не более того. Однако мне бы хотелось, чтобы мои конечные точки принимали сообщения только с ключом маршрутизации tenantName . Есть ли способ случайно?

РЕДАКТИРОВАТЬ :

Сторона издателя:

container.AddMassTransit(x =>
{
    x.AddBus(() => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {

        cfg.Send<ObjectAddedIntegrationEvent>(routingCfg => {
            routingCfg.UseRoutingKeyFormatter(config => ConfigurationValuesProvider.Current.Get("TenantCode"));
        });
        cfg.Message<ObjectAddedIntegrationEvent>(routingCfg => routingCfg.SetEntityName("ObjectAddedIntegrationEvent"));
        cfg.Publish<ObjectAddedIntegrationEvent>(routingCfg => routingCfg.ExchangeType = ExchangeType.Direct);


        var host = cfg.Host(ConfigurationValuesProvider.Current.Get("RabbitMQHostName"), hostConfigurator =>
        {
#if !DEBUG
            hostConfigurator.Username(ConfigurationValuesProvider.Current.Get("RabbitMQUsername"));
            hostConfigurator.Password(ConfigurationValuesProvider.Current.Get("RabbitMQPassword"));

            hostConfigurator.UseCluster(c =>
            {
                string[] hostnames = ConfigurationValuesProvider.Current.Get("RabbitMQNodes").Split(';');
                c.ClusterMembers = hostnames;
            });
#endif
        });
    }));
});

Сторона получателя:

container.AddMassTransit(x =>
{
    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host(configurationProvider.RabbitMQHostName, hostConfigurator =>
        {
            hostConfigurator.Username(configurationProvider.RabbitMQUsername);
            hostConfigurator.Password(configurationProvider.RabbitMQPassword);

            hostConfigurator.UseCluster(c =>
            {
                string[] hostnames = configurationProvider.RabbitMQNodes.Split(';');
                c.ClusterMembers = hostnames;
            });
        });

        host.Settings.GetConnectionFactory().Endpoint.AddressFamily = AddressFamily.InterNetwork;

        x.AddConsumer<ObjectAddedHandler>().Endpoint(e => e.Name = "ObjectAddedHandler "+configurationProvider.TenantName);

        cfg.ConfigureEndpoints(container);
    }));

});



internal class ObjectAddedHandler : IConsumer<ObjectAddedIntegrationEvent>
{
    public async Task Consume(ConsumeContext<ObjectAddedIntegrationEvent> context)
    {
        var @event = context.Message;
        await HandleAsync(@event).ConfigureAwait(false);
    }
}


internal class ObjectAddedHandlerConsumerDefinition :
            ConsumerDefinition<ObjectAddedHandler>
{
    private readonly IConfigurationProvider _provider;

    public ObjectAddedHandlerConsumerDefinition(IConfigurationProvider provider)
    {
        _provider = provider;

        EndpointName = "ObjectAddedHandler" + provider.TenantName;
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
    IConsumerConfigurator<ObjectAddedHandler> consumerConfigurator)
    {
        if (endpointConfigurator is IRabbitMqReceiveEndpointConfigurator rabbit)
        {
            rabbit.BindMessageExchanges = false;

            rabbit.Bind("ObjectAddedIntegrationEvent", s =>
            {
                s.RoutingKey = _provider.TenantName;
                s.ExchangeType = ExchangeType.Direct;
            });
        }
    }
}

при запуске потребителя прямой обмен ObjectAddedIntegrationEvent создается правильно. Также ObjectAddedHandlerTenant обмен разветвления (который должен быть соответствующим обменом) создан правильно. К сожалению, когда я пытаюсь отправить сообщение со стороны издателя, и я наблюдаю за прямым обменом ObjectAddedIntegrationEvent , я ничего не вижу.

1 Ответ

2 голосов
/ 09 марта 2020

Вы можете сделать это с помощью определения потребителя, которое должно быть добавлено вместе с вашим потребителем с помощью AddConsumer<T>(typeof(definitionclass)). Это может быть похоже на это:

public class ObjectAddedHandlerDefinition :
    ConsumerDefinition<ObjectAddedHandler>
{
    public ObjectAddedHandlerDefinition(IConfigurationProvider provider)
    {
        _provider = provider;

        EndpointName = "ObjectAddedHandler" + provider.TenantName;

        ConcurrentMessageLimit = 4;
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<ObjectAddedHandler> consumerConfigurator)
    {
        if(endpointConfigurator is IRabbitMqReceiveEndpointConfigurator rabbit)
        {
            rabbit.BindMessageExchanges = false;

            // or use Bind<T> for message type name
            rabbit.Bind("some-exchange", s => 
            {
                s.RoutingKey = _provider.TenantName;
                s.ExchangeType = ExchangeType.Direct;
            });
        }
    }
}
...