Я пытаюсь найти информацию о способе получения области DI, созданной для сообщения Consumer, в фильтре SendContext / PublishContext.
Мне удалось найти несколько советов, как это сделать для ConsumeContext. промежуточное программное обеспечение в MassTransit с использованием MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionConsumerScopeProvider
public static void UseUserServiceMiddleware(this IPipeConfigurator<ConsumeContext> configurator, IServiceProvider serviceProvider)
{
// create PayLoad to retrieve current scope in Middleware
var scopeProvider = new DependencyInjectionConsumerScopeProvider(serviceProvider);
configurator.AddPipeSpecification(new FilterPipeSpecification<ConsumeContext>(new ScopeFilter(scopeProvider)));
// add specific filter/middleware to set context of IUserService
configurator.AddPipeSpecification(new UserServiceMiddlewareSpecification<ConsumeContext>());
}
И в реализации IFliter
public class UserServiceMiddlewareFilter<T> :
IFilter<T>
where T : class, ConsumeContext
{
public void Probe(ProbeContext context)
{
}
public async Task Send(T context, IPipe<T> next)
{
try
{
// current Consumer scope
if (context.TryGetPayload(out IServiceScope scope))
{
if (context.Headers.TryGetHeader("clientIp", out var clientIp) &&
context.Headers.TryGetHeader("subId", out var subId))
{
UserIdentityRetrievalSetter(scope, clientIp?.ToString(), subId?.ToString());
}
}
await next.Send(context).ConfigureAwait(false);
}
catch (Exception ex)
{
// propagate the exception up the call stack
throw;
}
}
private void UserIdentityRetrievalSetter(IServiceScope scope, string clientIp, string subId)
{
if (!string.IsNullOrWhiteSpace(clientIp) && !string.IsNullOrWhiteSpace(subId))
{
var userIdentityRetrieval =
scope.ServiceProvider.GetRequiredService<UserIdentityProvider.RequestHandlers.UserIdentityRetrieval>();
userIdentityRetrieval.SetIpAddress(clientIp);
userIdentityRetrieval.SetUserSubId(subId);
}
}
}
С помощью context.TryGetPayload (из области видимости IService)в область, созданную для потребителя сообщений.
Проблема, с которой я сталкиваюсь, заключается в том, как получить текущую область DI для SendContext / PublishContext до того, как сообщение будет отправлено в RabbitMq, чтобы иметь возможность разрешать реализации в областях, которые явозможность установить / перенастроить в промежуточном программном обеспечении ConsumeContext. Идеальное решение для меня заключается в том, что отправитель сообщения и потребитель не знают о какой-то части логики, и существует необходимость в добавлении некоторой дополнительной информации в сообщения (я считаю, что заголовки идеально подходят для этого), которые не являются частьюих.
Я смог написать что-то вроде этого
cfg.ConfigureSend(exec => exec.UseExecute(context =>
{
if (context.GetType().IsGenericType &&
context.GetType().GetGenericTypeDefinition() ==
typeof(MassTransit.RabbitMqTransport.Contexts.BasicPublishRabbitMqSendContext<>))
{
if (context.GetType().GetProperty("Message")?.GetValue(context, null) is
Core.MessageBus.Models.IBusDrivenCommunication message)
{
SendContext sendContext;
if (context.TryGetPayload(out ConsumeContext _))
{
var userIdentityScoped = provider.GetService<UserIdentityProvider.RequestHandlers.UserIdentityRetrieval>();
var clientIpAddress = userIdentityScoped.GetIpAddress();
var subId = userIdentityScoped.GetUserSubGuidClaim();
if (!context.Headers.TryGetHeader("clientIp", out object cip) &&
!context.Headers.TryGetHeader("subId", out object sid))
{
context.Headers.Set("clientIp", clientIpAddress);
context.Headers.Set("subId", subId);
}
}
if (context.TryGetPayload(out sendContext))
{
var userIdentityScoped = provider.GetService<UserIdentityProvider.RequestHandlers.UserIdentityRetrieval>();
var clientIpAddress = userIdentityScoped.GetIpAddress();
var subId = userIdentityScoped.GetUserSubGuidClaim();
if (!context.Headers.TryGetHeader("clientIp", out object cip) &&
!context.Headers.TryGetHeader("subId", out object sid))
{
context.Headers.Set("clientIp", clientIpAddress);
context.Headers.Set("subId", subId);
}
}
}
}
}));
Поставщик доставлен из IServiceCollectionConfigurator.AddBus, но нет способа разрешить сервисы с определенной областью. Я пытался получить Consumer scope
var consumerScope = context.GetPayload<IConsumerScopeContext>();
var consumeContext = context.GetPayload<ConsumeContext>();
var consumerScopeProvider = provider.GetRequiredService<IConsumerScopeProvider>();
using (var scopeMain = consumerScopeProvider.GetScope(consumeContext))
{
var serviceScope = scopeMain.Context.GetPayload<IServiceScope>();
var userIdentityScoped =
serviceScope.ServiceProvider.GetRequiredService<UserIdentityProvider.RequestHandlers.UserIdentityRetrieval>();
}
return;
Но это слепое пятно, я пытался найти в SendContext какой-либо намек на PayLoad или что-то, чтобы иметь возможность получить текущий объем IServiceProvider.
В крайнем случае я могу переместить логику в модель команд / событий для сообщений из сервисов с заданной областью, а затем изменить ConsumeContext, чтобы получать эти данные из них, а не из заголовков, но я предпочитаю отделять логику от моделей.
В документации MassTrasnit есть функциональность аудита http://masstransit -project.com / MassTransit / advanced / audit / , но, насколько я понимаю, наблюдатели не используются для изменения данных.