У меня проблемы с AzureServiceBus. Иногда (это происходит случайно), когда команда или событие выбрасывается в шину, я получаю это исключение (изображение ниже), и команда / событие не запускается, поэтому я пропускаю его.
Также иногда исключение говорит о том, что сервис отписался от некоторых тем.
Случилось ли это с кем-то?
Журналы исключений
Конфигурация запуска:
services.AddScoped<IHostedService, MassTransitHostedService>();
//Create Autofac ContainerBuilder
var containerBuilder = new ContainerBuilder();
containerBuilder.Populate(services);
// register ApplicationModule (contains all modules for application)
containerBuilder.RegisterModule(new ApplicationModule(Configuration));
return new AutofacServiceProvider(containerBuilder.Build());
Конфигурация шины:
protected override void Load(ContainerBuilder builder)
{
builder.Register(y => new ConsumerAndDeliver(y.Resolve<IIncomingMessageDeliver>())).As<ConsumerAndDeliver>().SingleInstance();
builder.Register(c =>
{
IComponentContext ctx = c.Resolve<IComponentContext>();
var consumer = ctx.Resolve<ConsumerAndDeliver>();
var bus = CreateBusInAzureServiceBusToRegister(consumer);
return bus;
})
.As<IBusControl>()
.As<IPublishEndpoint>()
.As<ISendEndpointProvider>()
.SingleInstance();
RegisterQueuesForCommands();
}
private IBusControl CreateBusInAzureServiceBusToRegister(object consumer)
{
var bus = Bus.Factory.CreateUsingAzureServiceBus(sbc =>
{
var host = sbc.Host(_configuration["myConnectionString"], h =>
{ });
sbc.ReceiveEndpoint(host, _configuration["myQueueName"], e =>
{
e.Instance(consumer);
});
});
return bus;
}
HostedService для управления жизненным циклом шины
public class MassTransitHostedService : Microsoft.Extensions.Hosting.IHostedService
{
private readonly IBusControl _busControl;
public MassTransitHostedService(IBusControl busControl)
{
_busControl = busControl;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _busControl.StartAsync(cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _busControl.StopAsync(TimeSpan.FromSeconds(10));
}
}
Отправка и публикация sh методы:
private readonly ISendEndpointProvider _commandSendEndpoint;
private readonly IPublishEndpoint _eventPublishEndpoint;
private async Task DispatchMessageToBus<T>(T message, CancellationToken cancellationToken = default)
{
dynamic msg = message;
switch (message)
{
case IOutcomingCommand outcomingCommand:
var type = outcomingCommand.GetType();
await _commandSendEndpoint.Send(outcomingCommand, type, cancellationToken);
break;
case IOutcomingEvent _:
await _eventPublishEndpoint.Publish(msg, cancellationToken);
break;
default:
throw new ArgumentException("Dispatch message must be IOutcomingCommand or IOutcomingEvent");
}
}