MassTransit - дождаться завершения всех действий и продолжить обработку - PullRequest
4 голосов
/ 10 ноября 2019

Если у меня много операций, это приводит к блокировке ресурсов или истечению времени ожидания?

Вот мой сценарий:

У меня есть контроллер API, который отправляет запрос Order потребителю;Я использую шаблон запроса / ответа, чтобы получить ErrorMessage свойство от потребителя и основываться на ответе этого свойства обратно, если оно равно null, я бы хотел вернуть OK(), в противном случае вернуть BadRequest или Ok, но с таким сообщением, как: Продукт отсутствует на складе для уведомления клиента .

У моего потребителя есть сборкабланк маршрутизации, который имеет 2 действия:

  • CreateOrderActivity : создает заказ с деталями заказа.
  • ReserveProductActivity :Что уменьшает количество товара на складе, if product quantity < 0 Я опубликую сообщение с ErrorMessage обратно к потребителю и компенсирую предыдущую активность.

    public async Task Consume(ConsumeContext<ProcessOrder> context)
    {
        try
        {
            if (!string.IsNullOrEmpty(context.Message.ErrorMessage))
            {
                await context.RespondAsync<OrderSubmitted>(new
                {
                    context.Message.OrderId,
                    context.Message.ErrorMessage
                });
    
                return;
            }
    
            RoutingSlipBuilder builder = new RoutingSlipBuilder(context.Message.OrderId);
            // get configs
            var settings = new Settings(_configuration);
    
            // Add activities
            builder.AddActivity(settings.CreateOrderActivityName, settings.CreateOrderExecuteAddress);
            builder.SetVariables(new { context.Message.OrderId, context.Message.Address, context.Message.CreatedDate, context.Message.OrderDetails });
    
            builder.AddActivity(settings.ReserveProductActivityName, settings.ReserveProductExecuteAddress);
            builder.SetVariables(new { context.Message.OrderDetails });
    
    
            await context.Execute(builder.Build());
    
            await context.RespondAsync<OrderSubmitted>(new
            {
                context.Message.OrderId
            });
    
        }
        catch (Exception ex)
        {
            _log.LogError("Can not create Order {OrderId}", context.Message.OrderId);
            throw new Exception(ex.Message);
        }
    }
    

Код для ReserveProductActivity:

    public async Task<ExecutionResult> Execute(ExecuteContext<ReserveProductArguments> context)
    {
        var orderDetails = context.Arguments.OrderDetails;

        foreach (var orderDetail in orderDetails)
        {
            var product = await _productRepository.GetByProductId(orderDetail.ProductId);
            if (product == null) continue;

            var quantity = product.SetQuantity(product.QuantityInStock - orderDetail.Quantity);


            if (quantity < 0)
            {
                var errorMessage = "Out of stock.";
                await context.Publish<ProcessOrder>(new
                {
                    ErrorMessage = errorMessage
                });
                throw new RoutingSlipException(errorMessage);
            }

            await _productRepository.Update(product);
        }

        return context.Completed(new Log(orderDetails.Select(x => x.ProductId).ToList()));
    }

Эта строка кода в потребительском методе ожидает контекст. Execute (builder.Build ())

Сначала я подумал, что он создаст квитанцию ​​и выполнит все действияПрежде чем перейти к следующей строке, но это не так. Вместо этого он сразу переходит к следующей строке кода (которая отвечает обратно контроллеру), а затем после выполнения действий, а это не то, что мне нужно. Мне нужно сначала проверить количество продукта во 2-м действии и на основе этого возврата вернуться к контроллеру.

(В текущем случае он всегда отвечает сначала контроллеру - строка после buider.Buid(), а затем, если quantity < 0, он по-прежнему переходит к самому первому условию метода потребления, но так как он уже отвечает, я не могу вызвать ответ внутри этого оператора if снова.

Короче говоря, если продукт все еще доступен во втором задании, я могу отправить ответ как обычно (который выполняет код после context.Execute(builder.Build()), но если quantity < 0 - чтоЯ публикую обратно на потребительский метод с ErrorMessage , я бы хотел, чтобы он перешел к самому первому, если условие метода Consume (if(!string.IsNullOrEmpty(context.Message.ErrorMessage)) ...) и основано на ErrorMessage уведомить клиента.

Что-то не так с этим подходом? Как мне добиться чего-то подобного?

Спасибо

Ответы [ 2 ]

1 голос
/ 11 ноября 2019

Не задокументировано, но возможно использовать прокси для выполнения квитанции маршрутизации и ответа на запрос с результатом квитанции маршрутизации. Вы можете увидеть подробности в модульных тестах:

https://github.com/MassTransit/MassTransit/blob/master/src/MassTransit.Tests/Courier/RequestRoutingSlip_Specs.cs#L26

Вы можете создать прокси, который формирует квитанцию ​​о маршруте и выполняет ее, а также прокси-ответчик - оба из которых затемнастроенный на конечной точке получения как .Instance потребителей.

  class RequestProxy :
        RoutingSlipRequestProxy<Request>
    {
        protected override void BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<Request> request)
        {
        // get configs
        var settings = new Settings(_configuration);

        // Add activities
        builder.AddActivity(settings.CreateOrderActivityName, settings.CreateOrderExecuteAddress);
        builder.SetVariables(new { context.Message.OrderId, context.Message.Address, context.Message.CreatedDate, context.Message.OrderDetails });

        builder.AddActivity(settings.ReserveProductActivityName, settings.ReserveProductExecuteAddress);
        builder.SetVariables(new { context.Message.OrderDetails });
        }
    }


    class ResponseProxy :
        RoutingSlipResponseProxy<Request, Response>
    {
        protected override Response CreateResponseMessage(ConsumeContext<RoutingSlipCompleted> context, Request request)
        {
            return new Response();
        }
    }

Вы можете затем вызвать его от потребителя или поместить логику упорядочения в прокси-сервер - в зависимости от того, что имеет смысл, а затем использовать клиент запроса от вашегоконтроллер для отправки запроса и ожидания ответа.

0 голосов
/ 13 ноября 2019

спасибо за ответ, я могу вызвать запрос и ответ прокси внутри метода потребителя? Прямо сейчас мне нужно вызвать его при запуске при настройке masstransit bus

private static IBusControl ConfigureBus(IServiceProvider provider)
    {
        var options = provider.GetRequiredService<IOptions<AppConfig>>().Value;

        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(options.Host, options.VirtualHost, h =>
            {
                h.Username(options.Username);
                h.Password(options.Password);
            });

            cfg.ReceiveEndpoint(host, "process-order", e =>
            {
                e.Consumer(() => new ProcessOrderConsumer(provider.GetRequiredService<IConfiguration>()));

                var a = new ProcessOrderRequestProxy(provider.GetRequiredService<IConfiguration>());
                var b = new ResponseProxy();
                e.Instance(a);
                e.Instance(b);
            });

            var compensateCreateOrderAddress = new Uri(string.Concat("rabbitmq://localhost/", "compensate_createorder"));
            cfg.ReceiveEndpoint(host, "execute_createorder", e =>
            {
                e.ExecuteActivityHost<CreateOrderActivity, CreateOrderArguments>(compensateCreateOrderAddress, () => new
                    CreateOrderActivity(provider.GetRequiredService<IOrderRepository>(), provider.GetRequiredService<IOrderDetailRepository>()));
            });
            cfg.ReceiveEndpoint(host, "compensate_createorder", e =>
            {
                e.CompensateActivityHost<CreateOrderActivity, CreateOrderLog>(() => new
                    CreateOrderActivity(provider.GetRequiredService<IOrderRepository>(), provider.GetRequiredService<IOrderDetailRepository>()));
            });
        });
    }
...