Я использую Masstransit.RabbitMQ с моим основным приложением dotnet. Я строю фреймворк, который обернет Masstransit и, если потребуется, сможет подключить и подключить другую сервисную шину в будущем.
Я получаю сообщение об ошибке в следующем LOC:
TaskUtil.Await(BusControl.Publish<Message<TMessage>>(message, c => { c.Durable = true; }));
Обычно работает нормально. Но когда я пытаюсь опубликовать более 50000 сообщений, я получаю следующую ошибку после правильной доставки 80% сообщений:
Уже закрыто: операция AMQP была прервана: причина закрытия AMQP,> инициированная библиотекой, код = 541, текст = "неожиданное исключение", classId = 0,> methodId = 0, причина = System.IO.IOException: Невозможно прочитать данные из> транспортного соединения: существующее соединение было принудительно закрыто> удаленным хостом. ---> System.Net.Sockets.SocketException: существующее соединение> было принудительно закрыто удаленным хостом
в System.Net.Sockets.NetworkStream.Read (буфер Byte [], смещение Int32,> размер Int32)
--- Конец внутренней трассировки стека исключений ---
в RabbitMQ.Client.Impl.InboundFrame.ReadFrom (читатель NetworkBinaryReader)
at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration ()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoop ()
Я использую синглтон для создания соединения.
Не уверен, почему соединение не удается после точки.
Моя цель - создать надежную среду, которая сможет обрабатывать любое количество сообщений.
Я попытался добавить сердцебиение в 5 секунд и установить PublishConfirms как true. Он просто минимизировал TAT для появления ошибки. Для справки, Prefetch установлен на 16. Я проверил многих пользователей, которые сталкивались с подобной проблемой, но не смогли найти решение этой проблемы.
return MassTransit.Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(settings.RabbitMQURL), hst =>
{
hst.Username(settings.RabbitMQUserName);
hst.Password(settings.RabbitMQPassword);
hst.Heartbeat(5);
hst.PublisherConfirmation = true;
});
foreach (Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> action in settings.BeforeBuildActions)
{
action.Invoke(cfg, host);
}
});
Я ожидаю, что 100000 сообщений будут опубликованы и успешно использованы. Но я получаю ошибку на полпути.