Как получить текущую область IServiceProvider (MS DI) в MassTransit на фильтре SendContext - PullRequest
0 голосов
/ 10 октября 2019

Я пытаюсь найти информацию о способе получения области DI, созданной для сообщения Consumer, в фильтре SendContext / PublishContext.

Мне удалось найти несколько советов, как это сделать для ConsumeContext. промежуточное программное обеспечение в MassTransit с использованием MassTransit.ExtensionsDependencyInjectionIntegration.ScopeProviders.DependencyInjectionConsumerScopeProvider

public static void UseUserServiceMiddleware(this IPipeConfigurator<ConsumeContext> configurator, IServiceProvider serviceProvider)
{
    // create PayLoad to retrieve current scope in Middleware
    var scopeProvider = new DependencyInjectionConsumerScopeProvider(serviceProvider);
    configurator.AddPipeSpecification(new FilterPipeSpecification<ConsumeContext>(new ScopeFilter(scopeProvider)));
    // add specific filter/middleware to set context of IUserService
    configurator.AddPipeSpecification(new UserServiceMiddlewareSpecification<ConsumeContext>());
}

И в реализации IFliter

public class UserServiceMiddlewareFilter<T> :
    IFilter<T>
    where T : class, ConsumeContext
{
    public void Probe(ProbeContext context)
    {
    }

    public async Task Send(T context, IPipe<T> next)
    {
        try
        {
            // current Consumer scope
            if (context.TryGetPayload(out IServiceScope scope))
            {
                if (context.Headers.TryGetHeader("clientIp", out var clientIp) &&
                    context.Headers.TryGetHeader("subId", out var subId))
                {
                    UserIdentityRetrievalSetter(scope, clientIp?.ToString(), subId?.ToString());
                }
            }

            await next.Send(context).ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            // propagate the exception up the call stack
            throw;
        }
    }

    private void UserIdentityRetrievalSetter(IServiceScope scope, string clientIp, string subId)
    {
        if (!string.IsNullOrWhiteSpace(clientIp) && !string.IsNullOrWhiteSpace(subId))
        {
            var userIdentityRetrieval =
                scope.ServiceProvider.GetRequiredService<UserIdentityProvider.RequestHandlers.UserIdentityRetrieval>();
            userIdentityRetrieval.SetIpAddress(clientIp);
            userIdentityRetrieval.SetUserSubId(subId);
        }
    }
}

С помощью context.TryGetPayload (из области видимости IService)в область, созданную для потребителя сообщений.

Проблема, с которой я сталкиваюсь, заключается в том, как получить текущую область DI для SendContext / PublishContext до того, как сообщение будет отправлено в RabbitMq, чтобы иметь возможность разрешать реализации в областях, которые явозможность установить / перенастроить в промежуточном программном обеспечении ConsumeContext. Идеальное решение для меня заключается в том, что отправитель сообщения и потребитель не знают о какой-то части логики, и существует необходимость в добавлении некоторой дополнительной информации в сообщения (я считаю, что заголовки идеально подходят для этого), которые не являются частьюих.

Я смог написать что-то вроде этого

cfg.ConfigureSend(exec => exec.UseExecute(context =>
                    {
                        if (context.GetType().IsGenericType &&
                            context.GetType().GetGenericTypeDefinition() ==
                            typeof(MassTransit.RabbitMqTransport.Contexts.BasicPublishRabbitMqSendContext<>))
                        {
                            if (context.GetType().GetProperty("Message")?.GetValue(context, null) is
                                Core.MessageBus.Models.IBusDrivenCommunication message)
                            {
                                SendContext sendContext;

                                if (context.TryGetPayload(out ConsumeContext _))
                                {
                                    var userIdentityScoped = provider.GetService<UserIdentityProvider.RequestHandlers.UserIdentityRetrieval>();
                                    var clientIpAddress = userIdentityScoped.GetIpAddress();
                                    var subId = userIdentityScoped.GetUserSubGuidClaim();
                                    if (!context.Headers.TryGetHeader("clientIp", out object cip) &&
                                        !context.Headers.TryGetHeader("subId", out object sid))
                                    {
                                        context.Headers.Set("clientIp", clientIpAddress);
                                        context.Headers.Set("subId", subId);
                                    }
                                }

                                if (context.TryGetPayload(out sendContext))
                                {
                                    var userIdentityScoped = provider.GetService<UserIdentityProvider.RequestHandlers.UserIdentityRetrieval>();
                                    var clientIpAddress = userIdentityScoped.GetIpAddress();
                                    var subId = userIdentityScoped.GetUserSubGuidClaim();
                                    if (!context.Headers.TryGetHeader("clientIp", out object cip) &&
                                        !context.Headers.TryGetHeader("subId", out object sid))
                                    {
                                        context.Headers.Set("clientIp", clientIpAddress);
                                        context.Headers.Set("subId", subId);
                                    }
                                }
                            }
                        }
                    }));

Поставщик доставлен из IServiceCollectionConfigurator.AddBus, но нет способа разрешить сервисы с определенной областью. Я пытался получить Consumer scope

var consumerScope = context.GetPayload<IConsumerScopeContext>();

var consumeContext = context.GetPayload<ConsumeContext>();
var consumerScopeProvider = provider.GetRequiredService<IConsumerScopeProvider>();
using (var scopeMain = consumerScopeProvider.GetScope(consumeContext))
{
    var serviceScope = scopeMain.Context.GetPayload<IServiceScope>();
    var userIdentityScoped =
        serviceScope.ServiceProvider.GetRequiredService<UserIdentityProvider.RequestHandlers.UserIdentityRetrieval>();
}
return;

Но это слепое пятно, я пытался найти в SendContext какой-либо намек на PayLoad или что-то, чтобы иметь возможность получить текущий объем IServiceProvider.

В крайнем случае я могу переместить логику в модель команд / событий для сообщений из сервисов с заданной областью, а затем изменить ConsumeContext, чтобы получать эти данные из них, а не из заголовков, но я предпочитаю отделять логику от моделей.

В документации MassTrasnit есть функциональность аудита http://masstransit -project.com / MassTransit / advanced / audit / , но, насколько я понимаю, наблюдатели не используются для изменения данных.

...