Publi sh - Azure служебная шина с использованием MassTransit вызывает ошибки - PullRequest
0 голосов
/ 28 мая 2020

Я новичок в MassTransit и Azure Service Bus. Я пытаюсь использовать архитектуру, в которой RabbitMq или Azure служебная шина используется в API. NET Core 3.1. У меня есть часть RabbitMq, и я только что запустил сервисную шину Azure. У меня есть API, который принимает входящие данные и публикует sh их в очереди. Когда я пытаюсь опубликовать sh с помощью подхода Azure служебной шины, я получаю сообщение об ошибке "SubCode = 40000. Не могу работать с типом Topi c, потому что пространство имен servicehubqa использует Basi c 'tier.

Я пытаюсь использовать подход с использованием очереди и надеюсь создать очередь по мере публикации сообщений. В настоящее время служебная шина использует ценовой уровень Basi c, как указано в документации. что я могу играть с очередями на этом уровне. Я не уверен, нужно ли мне вручную создавать очередь (мне пришлось сделать этот подход с помощью RabbitMq, поскольку очередь не будет создана, если нет потребителя). По умолчанию используется topi c подход, если ничего не указано? Как указать очередь vs topi c?

Мой код выглядит следующим образом:

Запуск - ConfigureServices

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton(Configuration);

        services.AddScoped<IMassTransitRabbitMqTransport, MassTransitRabbitMqTransport>();
        services.AddScoped<IMassTransitAzureServiceBusTransport, MassTransitAzureServiceBusTransport>();

        var messageProvider = ConfigProvider.GetConfig("MessageService", "Messaging_Service");
        switch (messageProvider)
        {
            case "AzureServiceBus":
                services.AddScoped<IMessagingService, MassTransitAzureServiceBusMessagingService>();
                break;
            case "RabbitMq":
                services.AddScoped<IMessagingService, MassTransitRabbitMqMessagingService>();
                break;
            default:
                throw new ArgumentException("Invalid message service");
        };

        services.AddControllers();
    }

Контроллер

public class ListenerController : ControllerBase
{
    readonly ILogger<ListenerController> logger;
    readonly IMessagingService messenger;

    public ListenerController(
        ILogger<ListenerController> logger,
        IMessagingService messenger)
    {
        this.logger = logger;
        this.messenger = messenger;
    }

    [HttpPost]
    public async Task<IActionResult> Post()
    {
        var payload = new
        {
            ...
        };

        await messenger.Publish(payload);

        return Ok();
    }
}

IMessagingService

public interface IMessagingService
{
    Task Publish(object payload);
}

IMassTransitTransport

public interface IMassTransitTransport
{
    IBusControl BusControl { get; }
}

public interface IMassTransitRabbitMqTransport : IMassTransitTransport { }

public interface IMassTransitAzureServiceBusTransport : IMassTransitTransport { }

MassTransitAzureServiceBusTransport

public sealed class MassTransitAzureServiceBusTransport : IMassTransitAzureServiceBusTransport
{
    public IBusControl BusControl { get; }

    public MassTransitAzureServiceBusTransport()
    {
        BusControl = ConfigureBus();
        BusControl.StartAsync();
    }

    IBusControl ConfigureBus()
    {
        return Bus.Factory.CreateUsingAzureServiceBus(config => {
            var host = config.Host(ConfigProvider.GetConfig("AzureServiceBus", "AzureServiceBus_ConnStr"), host => { });
        });
    }
}

MassTransitAzureServiceBusMessagingService

public class MassTransitAzureServiceBusMessagingService : IMessagingService
{
    readonly IMassTransitAzureServiceBusTransport massTransitTransport;

    public MassTransitAzureServiceBusMessagingService(IMassTransitAzureServiceBusTransport massTransitTransport)
    {
        //transport bus config already happens in massTransitTransport constructor
        this.massTransitTransport = massTransitTransport;
    }

    public async Task Publish(object payload)
    {
        var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
        var cmd = JObject.Parse(jsn)["Command"];

        switch (cmd.ToString())
        {
            case "UPDATESTATUS":
                //IRegisterCommandUpdateStatus is an interface specifying the properties needed
                await massTransitTransport.BusControl.Publish<IRegisterCommandUpdateStatus>(payload);
                break;
            default: break;
        }
    }
}

Ответы [ 2 ]

0 голосов
/ 28 мая 2020

В документации MassTransit действительно указано, что если вы хотите использовать Topi c (то есть возможность публиковать sh для нескольких подписок одновременно), вы используете publi sh. Если вы хотите отправить сообщение в очередь (сообщение направляется в указанное c расположение), вы используете отправку и предоставляете правильную информацию.

Темы требуют стандартной цены, а очереди могут использовать базовые c цены.

С этой информацией MassTransitAzureServiceBusMessagingService будет изменен следующим образом:

Basi c Pricing - Queues

    public async Task Publish(object payload)
    {
        var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
        var cmd = JObject.Parse(jsn)["Command"];

        switch (cmd.ToString())
        {
            case "UPDATESTATUS":
                var queueUri = new Uri(massTransitTransport.BusControl.Address, "registration.updatestatus");
                var endpoint = await massTransitTransport.BusControl.GetSendEndpoint(queueUri);
                await endpoint.Send<IRegisterCommandUpdateStatus>(payload);
                break;
            default: break;
        }
    }

Стандартные цены - Темы / Подписки

    public async Task Publish(object payload)
    {
        var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
        var cmd = JObject.Parse(jsn)["Command"];

        switch (cmd.ToString())
        {
            case "UPDATESTATUS":
                await massTransitTransport.BusControl.Publish<IRegisterCommandUpdateStatus>(payload);
                break;
            default: break;
        }
    }
0 голосов
/ 28 мая 2020

Базовый уровень Azure служебной шины c не позволяет использовать темы. Таким образом, вы не сможете использовать publi sh. Тем не менее, MassTransit на самом деле не работает с базовым уровнем c, несмотря на попытки в прошлом, которые могли быть успешными.

...