Промежуточное программное обеспечение с Masstransit опубликовать - PullRequest
5 голосов
/ 09 октября 2019

У меня есть приложение .net core WEB API с MassTransit (для реализации брокера сообщений RabbitMQ). Конфигурация RabbitMQ-MassTransit проста и выполняется в несколько строк кода в файле Startup.cs.

services.AddMassTransit(x =>
        {
            x.AddConsumer<CustomLogConsume>();

            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri("rabbitmq://rabbitmq/"), h =>
                {
                    h.Username("guest");
                    h.Password("guest");
                });

                cfg.ExchangeType = ExchangeType.Fanout;

                cfg.ReceiveEndpoint(host, "ActionLog_Queue", e =>
                {
                    e.PrefetchCount = 16;
                });

                // or, configure the endpoints by convention
                cfg.ConfigureEndpoints(provider);
            }));
        });

Я использую внедрение зависимостей в своем проекте для лучшего стандарта кода. Публикация сообщений работает нормально с внедрением зависимостей контроллера. Но когда я внедрил пользовательское промежуточное программное обеспечение для действий журнала, Masstransit не смог правильно опубликовать сообщение, он создал дополнительную очередь с _error в веб-консоли RabbitMQ.

public class RequestResponseLoggingMiddleware
{
    #region Private Variables

    /// <summary>
    /// RequestDelegate
    /// </summary>
    private readonly RequestDelegate _next;

    /// <summary>
    /// IActionLogPublish
    /// </summary>
    private readonly IActionLogPublish _logPublish;

    #endregion

    #region Constructor
    public RequestResponseLoggingMiddleware(RequestDelegate next, IActionLogPublish logPublish)
    {
        _next = next;
        _logPublish = logPublish;
    }
    #endregion

    #region PrivateMethods

    #region FormatRequest
    /// <summary>
    /// FormatRequest
    /// </summary>
    /// <param name="request"></param>
    /// <returns></returns>
    private async Task<ActionLog> FormatRequest(HttpRequest request)
    {
        ActionLog actionLog = new ActionLog();
        var body = request.Body;
        request.EnableRewind();

        var context = request.HttpContext;

        var buffer = new byte[Convert.ToInt32(request.ContentLength)];
        await request.Body.ReadAsync(buffer, 0, buffer.Length);
        var bodyAsText = Encoding.UTF8.GetString(buffer);
        request.Body = body;

        var injectedRequestStream = new MemoryStream();

        var requestLog = $"REQUEST HttpMethod: {context.Request.Method}, Path: {context.Request.Path}";

        using (var bodyReader = new StreamReader(context.Request.Body))
        {
            bodyAsText = bodyReader.ReadToEnd();

            if (string.IsNullOrWhiteSpace(bodyAsText) == false)
            {
                requestLog += $", Body : {bodyAsText}";
            }

            var bytesToWrite = Encoding.UTF8.GetBytes(bodyAsText);
            injectedRequestStream.Write(bytesToWrite, 0, bytesToWrite.Length);
            injectedRequestStream.Seek(0, SeekOrigin.Begin);
            context.Request.Body = injectedRequestStream;
        }

        actionLog.Request = $"{bodyAsText}";
        actionLog.RequestURL = $"{request.Scheme} {request.Host}{request.Path} {request.QueryString}";

        return actionLog;
    }
    #endregion

    #region FormatResponse
    private async Task<string> FormatResponse(HttpResponse response)
    {
        response.Body.Seek(0, SeekOrigin.Begin);
        var text = await new StreamReader(response.Body).ReadToEndAsync();
        response.Body.Seek(0, SeekOrigin.Begin);

        return $"Response {text}";
    }
    #endregion

    #endregion

    #region PublicMethods

    #region Invoke
    /// <summary>
    /// Invoke - Hits before executing any action. Actions call executes from _next(context)
    /// </summary>
    /// <param name="context"></param>
    /// <returns></returns>
    public async Task Invoke(HttpContext context)
    {
        ActionLog actionLog = new ActionLog();

        actionLog = await FormatRequest(context.Request);


        var originalBodyStream = context.Response.Body;

        using (var responseBody = new MemoryStream())
        {
            context.Response.Body = responseBody;

            await _next(context);

            actionLog.Response = await FormatResponse(context.Response);

            await _logPublish.Publish(actionLog);
            await responseBody.CopyToAsync(originalBodyStream);
        }
    }
    #endregion

    #endregion
}

настройка промежуточного программного обеспечения при запуске

  public async void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ............
        app.UseMiddleware<RequestResponseLoggingMiddleware>();
        ....................
    }

Существует ли какая-либо дополнительная конфигурация для запуска MassTransit для работы с Middle Ware

Редактировать

IActionLogPublish

public interface IActionLogPublish
{
    Task Publish(ActionLog model);
}

ActionLogPublish

public class ActionLogPublish : IActionLogPublish
{

    private readonly IBus _bus;

    public ActionLogPublish(IBus bus)
    {
        _bus = bus;
    }

    public async Task Publish(ActionLog actionLogData)
    {
        /* Publish values to RabbitMQ Service Bus */

        await _bus.Publish(actionLogData);

        /* Publish values to RabbitMQ Service Bus */
    }

}

Редактировать

Веб-консоль RabbitMQ

enter image description here

Ответы [ 2 ]

2 голосов
/ 14 октября 2019

Трудно сказать по описанию, какую именно ошибку вы получаете. Реализация промежуточного программного обеспечения выглядит сложной, и это может быть источником ошибки. Я предполагаю, что вы не правильно установили позицию потока или что-то в этом роде. Исправления от @Nkosi могут на самом деле исправить это.

Если вы говорите, что IBus работает правильно из контроллеров, которые создаются для запроса, вы можете попробовать реализовать интерфейс IMiddleware в своем промежуточном ПО, как описано вthis doc .

public class RequestResponseLoggingMiddleware : IMiddleware
{
    IActionLogPublish logPublish;

    public RequestResponseLoggingMiddleware(IActionLogPublish logPublish)
    {
        this.logPublish = logPublish;
    }

    // ...

    public async Task InvokeAsync(HttpContext context, RequestDelegate next)
    {
        //...
    }

    //...
}

. В этом случае промежуточное программное обеспечение будет зарегистрировано как сервис с областью действия или переходным процессом и разрешено для каждого запроса, так же, как контроллер. Что также может решить вашу проблему, если она связана с разрешением сервисов в области.

2 голосов
/ 14 октября 2019

Промежуточному программному обеспечению необходимо вернуть исходное тело в ответ.

Кроме того, внедренная зависимость отлично работает с контроллерами, а не с промежуточным программным обеспечением, так как она может быть зарегистрирована с временем жизни области.

В этом случае конструктор должен вводиться не в промежуточное ядро, а непосредственно в Invoke

Поскольку промежуточное ПО создается при запуске приложения, а не по запросу, scoped сервисы времени жизни, используемые конструкторами промежуточного программного обеспечения, не используются совместно с другими внедренными типами зависимостей во время каждого запроса. Если вам необходимо совместно использовать сервис с определенной областью между промежуточным программным обеспечением и другими типами, добавьте эти сервисы в подпись метода Invoke. Метод Invoke может принимать дополнительные параметры, которые заполняются DI:

//...omitted for brevity

public RequestResponseLoggingMiddleware(RequestDelegate next) {
    _next = next;
}

//...

private async Task<string> FormatResponseStream(Stream stream) {
    stream.Seek(0, SeekOrigin.Begin);
    var text = await new StreamReader(stream).ReadToEndAsync();
    stream.Seek(0, SeekOrigin.Begin);
    return $"Response {text}";
}

public async Task Invoke(HttpContext context, IActionLogPublish logger) {
    ActionLog actionLog = await FormatRequest(context.Request);
    //keep local copy of response stream
    var originalBodyStream = context.Response.Body;

    using (var responseBody = new MemoryStream()) {
        //replace stream for down stream calls
        context.Response.Body = responseBody;

        await _next(context);

        //put original stream back in the response object
        context.Response.Body = originalBodyStream; // <-- THIS IS IMPORTANT

        //Copy local stream to original stream
        responseBody.Position = 0;
        await responseBody.CopyToAsync(originalBodyStream);

        //custom logging
        actionLog.Response = await FormatResponse(responseBody);
        await logger.Publish(actionLog);
    }
}

Ссылка Внедрение зависимостей в ASP.NET Core: срок службы службы

При использовании службы промежуточного уровня в промежуточном программном обеспечении внедрите службу в метод Invoke или InvokeAsync. Не вводить с помощью конструктора, потому что это заставляет сервис работать как одиночный . Для получения дополнительной информации см. Создание пользовательского промежуточного программного обеспечения ASP.NET Core .

Акцент на шахте

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...