Мы используем RabbitMQ и MassTransit для обмена сообщениями между издателями и потребителями.Иногда мы получаем сообщение ниже исключения, и тогда все наши передачи сообщений внезапно прекращаются.Ниже я также публикую наш код конфигурации шины.
One or more errors occurred. (Timeout waiting for response,
RequestId: 00c60000-0aff-0242-e641-08d6a323ec24)",
"MassTransit.RequestTimeoutException: Timeout waiting for response,
RequestId: 00c60000-0aff-0242-e641-08d6a323ec24 at MassTransit.Clients.ResponseHandlerConnectHandle`1.GetTask()
at MassTransit.Clients.ClientRequestHandle`1.HandleFault() at MassTransit.Clients.ResponseHandlerConnectHandle`1.GetTask()
at MassTransit.Clients.RequestClient`1.GetResponse[T](TRequest message, CancellationToken cancellationToken, RequestTimeout timeout)
Вот наша конфигурация шины
service.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
if (startQuartz)
{
startQuartz = false;
MicroServiceConfig.MicroServiceHost.StartScheduledServices();
}
var config = provider.GetRequiredService<SLOTServerConfiguration>();
var vh = GetVirtualHost(config);
var url = config.RabbitMQHost + (config.RabbitMQHost.EndsWith("/") ? "" : "/") + vh;
var host = cfg.Host(new Uri(url), hst =>
{
hst.Username(config.RabbitMQUserName);
hst.Password(config.RabbitMQPassword);
});
if (!isApi)
{
cfg.AddReceiveEndpoint(host, provider, service, isApi);
host.ConnectConsumeObserver(new UserObserver());
host.ConnectSendObserver(new ErrorObserver());
}
}));
service.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
service.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
service.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
service.AddSingleton<IHostedService, BusService>();
service.AddMassTransit(x =>
{
var method = x.GetType().GetMethod("AddConsumer");
foreach (var item in RegisteredTypes)
{
method.MakeGenericMethod(item.ConsumerType).Invoke(x, new object[0]);
}
});
}
private static void AddReceiveEndpoint(this IRabbitMqBusFactoryConfigurator cfg, IRabbitMqHost host, IServiceProvider provider, IServiceCollection service, bool registerClient)
{
var queueName = DateTime.Now.ToFileTimeUtc().ToString() + JsonConvert.SerializeObject(MicroServiceConfig.MicroServiceHost.ServiceNames).GetHashCode();
cfg.ReceiveEndpoint(host, queueName, e =>
{
e.AutoDelete = true;
e.PrefetchCount = 16;
e.UseMessageRetry(x => x.Interval(2, 100));
e.LoadFrom(provider);
});
}
private static string GetVirtualHost(SLOTServerConfiguration config)
{
var vh = config.RabbitMQVirtualHost;
if(string.IsNullOrEmpty(vh))
{
vh = Environment.MachineName;
}
}
Обновление: ниже вы можете найти код издателя и потребителяблоки
var res = ThreadClient.GetResponse<AddExternalOutboundOrderListRequestModel, OutboundOrderResponseItem>(
_requestClient,
new ServerRequest<AddExternalOutboundOrderListRequestModel> { Model = model, Token = Request.Headers["token"] }
);
var ret = BaseResponseModel.Ok(res.Data, Request.Headers["RequestID"]);
ret.ExceptionMessage = res.ThreadExceptions;
return ret;
Потребитель:
public Task Consume(ConsumeContext<ServerRequest<AddExternalOutboundOrderListRequestModel>> context)
{
var response = _outboundOrderWorkFlow.Add(context.Message.Model);
ServerResponse<OutboundOrderResponseItem> res = new ServerResponse<OutboundOrderResponseItem>
{
Data = response
};
context.Respond(res);
return Task.Delay(0);
}