Я новичок в MassTransit и Azure Service Bus. Я пытаюсь использовать архитектуру, в которой RabbitMq или Azure служебная шина используется в API. NET Core 3.1. У меня есть часть RabbitMq, и я только что запустил сервисную шину Azure. У меня есть API, который принимает входящие данные и публикует sh их в очереди. Когда я пытаюсь опубликовать sh с помощью подхода Azure служебной шины, я получаю сообщение об ошибке "SubCode = 40000. Не могу работать с типом Topi c, потому что пространство имен servicehubqa использует Basi c 'tier.
Я пытаюсь использовать подход с использованием очереди и надеюсь создать очередь по мере публикации сообщений. В настоящее время служебная шина использует ценовой уровень Basi c, как указано в документации. что я могу играть с очередями на этом уровне. Я не уверен, нужно ли мне вручную создавать очередь (мне пришлось сделать этот подход с помощью RabbitMq, поскольку очередь не будет создана, если нет потребителя). По умолчанию используется topi c подход, если ничего не указано? Как указать очередь vs topi c?
Мой код выглядит следующим образом:
Запуск - ConfigureServices
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton(Configuration);
services.AddScoped<IMassTransitRabbitMqTransport, MassTransitRabbitMqTransport>();
services.AddScoped<IMassTransitAzureServiceBusTransport, MassTransitAzureServiceBusTransport>();
var messageProvider = ConfigProvider.GetConfig("MessageService", "Messaging_Service");
switch (messageProvider)
{
case "AzureServiceBus":
services.AddScoped<IMessagingService, MassTransitAzureServiceBusMessagingService>();
break;
case "RabbitMq":
services.AddScoped<IMessagingService, MassTransitRabbitMqMessagingService>();
break;
default:
throw new ArgumentException("Invalid message service");
};
services.AddControllers();
}
Контроллер
public class ListenerController : ControllerBase
{
readonly ILogger<ListenerController> logger;
readonly IMessagingService messenger;
public ListenerController(
ILogger<ListenerController> logger,
IMessagingService messenger)
{
this.logger = logger;
this.messenger = messenger;
}
[HttpPost]
public async Task<IActionResult> Post()
{
var payload = new
{
...
};
await messenger.Publish(payload);
return Ok();
}
}
IMessagingService
public interface IMessagingService
{
Task Publish(object payload);
}
IMassTransitTransport
public interface IMassTransitTransport
{
IBusControl BusControl { get; }
}
public interface IMassTransitRabbitMqTransport : IMassTransitTransport { }
public interface IMassTransitAzureServiceBusTransport : IMassTransitTransport { }
MassTransitAzureServiceBusTransport
public sealed class MassTransitAzureServiceBusTransport : IMassTransitAzureServiceBusTransport
{
public IBusControl BusControl { get; }
public MassTransitAzureServiceBusTransport()
{
BusControl = ConfigureBus();
BusControl.StartAsync();
}
IBusControl ConfigureBus()
{
return Bus.Factory.CreateUsingAzureServiceBus(config => {
var host = config.Host(ConfigProvider.GetConfig("AzureServiceBus", "AzureServiceBus_ConnStr"), host => { });
});
}
}
MassTransitAzureServiceBusMessagingService
public class MassTransitAzureServiceBusMessagingService : IMessagingService
{
readonly IMassTransitAzureServiceBusTransport massTransitTransport;
public MassTransitAzureServiceBusMessagingService(IMassTransitAzureServiceBusTransport massTransitTransport)
{
//transport bus config already happens in massTransitTransport constructor
this.massTransitTransport = massTransitTransport;
}
public async Task Publish(object payload)
{
var jsn = Newtonsoft.Json.JsonConvert.SerializeObject(payload);
var cmd = JObject.Parse(jsn)["Command"];
switch (cmd.ToString())
{
case "UPDATESTATUS":
//IRegisterCommandUpdateStatus is an interface specifying the properties needed
await massTransitTransport.BusControl.Publish<IRegisterCommandUpdateStatus>(payload);
break;
default: break;
}
}
}