У меня следующий тип потребителя:
internal class ObjectAddedHandler : IConsumer<ObjectAddedIntegrationEvent>
{
public async Task Consume(ConsumeContext<ObjectAddedIntegrationEvent> context)
{
var @event = context.Message;
await HandleAsync(@event).ConfigureAwait(false);
}
}
, который зарегистрирован в моем контейнере через:
container.AddMassTransit(x =>
{
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(configurationProvider.RabbitMQHostName, hostConfigurator =>
{
hostConfigurator.Username(configurationProvider.RabbitMQUsername);
hostConfigurator.Password(configurationProvider.RabbitMQPassword);
hostConfigurator.UseCluster(c =>
{
string[] hostnames = configurationProvider.RabbitMQNodes.Split(';');
c.ClusterMembers = hostnames;
});
});
host.Settings.GetConnectionFactory().Endpoint.AddressFamily = AddressFamily.InterNetwork;
/*HERE*/ x.AddConsumer<ObjectAddedHandler>().Endpoint(e => e.Name = "ObjectAddedHandler "+configurationProvider.TenantName);
cfg.ConfigureEndpoints(container);
}));
});
, однако, следуя документации Я бы нравится устанавливать прямой обмен для использования ключей маршрутизации. В документации нигде не удалось найти способ добавления потребителя таким же образом, как я, и в то же время установить свойства привязки конечной точки, как указано в документации.
Когда я пытаюсь получить доступ к конечной точке в добавив потребителя, я могу только изменить имя, количество предварительных выборок и еще пару свойств, но не более того. Однако мне бы хотелось, чтобы мои конечные точки принимали сообщения только с ключом маршрутизации tenantName . Есть ли способ случайно?
РЕДАКТИРОВАТЬ :
Сторона издателя:
container.AddMassTransit(x =>
{
x.AddBus(() => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Send<ObjectAddedIntegrationEvent>(routingCfg => {
routingCfg.UseRoutingKeyFormatter(config => ConfigurationValuesProvider.Current.Get("TenantCode"));
});
cfg.Message<ObjectAddedIntegrationEvent>(routingCfg => routingCfg.SetEntityName("ObjectAddedIntegrationEvent"));
cfg.Publish<ObjectAddedIntegrationEvent>(routingCfg => routingCfg.ExchangeType = ExchangeType.Direct);
var host = cfg.Host(ConfigurationValuesProvider.Current.Get("RabbitMQHostName"), hostConfigurator =>
{
#if !DEBUG
hostConfigurator.Username(ConfigurationValuesProvider.Current.Get("RabbitMQUsername"));
hostConfigurator.Password(ConfigurationValuesProvider.Current.Get("RabbitMQPassword"));
hostConfigurator.UseCluster(c =>
{
string[] hostnames = ConfigurationValuesProvider.Current.Get("RabbitMQNodes").Split(';');
c.ClusterMembers = hostnames;
});
#endif
});
}));
});
Сторона получателя:
container.AddMassTransit(x =>
{
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(configurationProvider.RabbitMQHostName, hostConfigurator =>
{
hostConfigurator.Username(configurationProvider.RabbitMQUsername);
hostConfigurator.Password(configurationProvider.RabbitMQPassword);
hostConfigurator.UseCluster(c =>
{
string[] hostnames = configurationProvider.RabbitMQNodes.Split(';');
c.ClusterMembers = hostnames;
});
});
host.Settings.GetConnectionFactory().Endpoint.AddressFamily = AddressFamily.InterNetwork;
x.AddConsumer<ObjectAddedHandler>().Endpoint(e => e.Name = "ObjectAddedHandler "+configurationProvider.TenantName);
cfg.ConfigureEndpoints(container);
}));
});
internal class ObjectAddedHandler : IConsumer<ObjectAddedIntegrationEvent>
{
public async Task Consume(ConsumeContext<ObjectAddedIntegrationEvent> context)
{
var @event = context.Message;
await HandleAsync(@event).ConfigureAwait(false);
}
}
internal class ObjectAddedHandlerConsumerDefinition :
ConsumerDefinition<ObjectAddedHandler>
{
private readonly IConfigurationProvider _provider;
public ObjectAddedHandlerConsumerDefinition(IConfigurationProvider provider)
{
_provider = provider;
EndpointName = "ObjectAddedHandler" + provider.TenantName;
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<ObjectAddedHandler> consumerConfigurator)
{
if (endpointConfigurator is IRabbitMqReceiveEndpointConfigurator rabbit)
{
rabbit.BindMessageExchanges = false;
rabbit.Bind("ObjectAddedIntegrationEvent", s =>
{
s.RoutingKey = _provider.TenantName;
s.ExchangeType = ExchangeType.Direct;
});
}
}
}
при запуске потребителя прямой обмен ObjectAddedIntegrationEvent создается правильно. Также ObjectAddedHandlerTenant обмен разветвления (который должен быть соответствующим обменом) создан правильно. К сожалению, когда я пытаюсь отправить сообщение со стороны издателя, и я наблюдаю за прямым обменом ObjectAddedIntegrationEvent , я ничего не вижу.