Как исправить проблему 541 для masstransit.rabbitmq (5.3.2), rabbitmq.client (5.1.0) для огромных партий опубликованных сообщений - PullRequest
0 голосов
/ 03 июня 2019

Я использую Masstransit.RabbitMQ с моим основным приложением dotnet. Я строю фреймворк, который обернет Masstransit и, если потребуется, сможет подключить и подключить другую сервисную шину в будущем.

Я получаю сообщение об ошибке в следующем LOC:

    TaskUtil.Await(BusControl.Publish<Message<TMessage>>(message, c => { c.Durable = true; }));

Обычно работает нормально. Но когда я пытаюсь опубликовать более 50000 сообщений, я получаю следующую ошибку после правильной доставки 80% сообщений:

Уже закрыто: операция AMQP была прервана: причина закрытия AMQP,> инициированная библиотекой, код = 541, текст = "неожиданное исключение", classId = 0,> methodId = 0, причина = System.IO.IOException: Невозможно прочитать данные из> транспортного соединения: существующее соединение было принудительно закрыто> удаленным хостом. ---> System.Net.Sockets.SocketException: существующее соединение> было принудительно закрыто удаленным хостом в System.Net.Sockets.NetworkStream.Read (буфер Byte [], смещение Int32,> размер Int32) --- Конец внутренней трассировки стека исключений --- в RabbitMQ.Client.Impl.InboundFrame.ReadFrom (читатель NetworkBinaryReader) at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration () at RabbitMQ.Client.Framing.Impl.Connection.MainLoop ()

Я использую синглтон для создания соединения. Не уверен, почему соединение не удается после точки. Моя цель - создать надежную среду, которая сможет обрабатывать любое количество сообщений.

Я попытался добавить сердцебиение в 5 секунд и установить PublishConfirms как true. Он просто минимизировал TAT для появления ошибки. Для справки, Prefetch установлен на 16. Я проверил многих пользователей, которые сталкивались с подобной проблемой, но не смогли найти решение этой проблемы.

        return MassTransit.Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri(settings.RabbitMQURL), hst =>
            {
                hst.Username(settings.RabbitMQUserName);
                hst.Password(settings.RabbitMQPassword);
                hst.Heartbeat(5);
                hst.PublisherConfirmation = true;
            });


            foreach (Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> action in settings.BeforeBuildActions)
            {
                action.Invoke(cfg, host);
            }
        });

Я ожидаю, что 100000 сообщений будут опубликованы и успешно использованы. Но я получаю ошибку на полпути.

...