Баланс MassTransit Round-Robin продолжает отправлять сообщения в одну и ту же очередь, даже если он занят - PullRequest
1 голос
/ 06 февраля 2020

Я пытаюсь настроить MassTransit с RabbitMQ для своего приложения.

Я использую CastleWindsor DI и. Net Framework 4.7.

Способ регистрации моих потребителей и шины в Program.cs:

container.AddMassTransit(x =>
{
    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host(configurationProvider.RabbitHostName);

        x.AddConsumer<FactAddedFirstHandler>();
        x.AddConsumer<FactAddedSecondHandler>();

        cfg.ReceiveEndpoint("addedFactQueue", ec =>
        {    
            cfg.ConfigureEndpoints(container);
        });
    }));

});

IBusControl busControl = container.Kernel.Resolve<IBusControl>();
busControl.StartAsync();

Пример обработчика:

internal class FactAddedFirstHandler : IConsumer<FactAddedEvent>
{

    public async Task Consume(ConsumeContext<FactAddedEvent> context)
    {
        //Do stuff
    }
}

Давайте назовем это основное работающее приложение просто как Приложение . Я хочу добиться двух разных вещей:

1) всякий раз, когда FactAddedEvent генерируется (из другой службы), тогда Приложение доставляет сообщение обоим потребителям: FactAddedFirstHandler и FactAddedSecondHandler.

Это происходит правильно, потому что, если я правильно понял, MassTransit создает обмен и доставляет сообщение всем потребителям, подписавшимся на это сообщение.

2), если я запускаю 2 экземпляра Приложение (назовем их Приложение1 и Приложение2 ), затем применяется Round-Robin. Это означает, что RabbitMQ балансирует сообщения на бесплатную версию Application .

Это происходит частично правильно. Это означает, что если у меня есть оба istances и я пытаюсь отправить FactAddedEvent , то один раз событие будет получено Application1 , а один раз - Application2 . Кроме того, если я приведу, например, Application2 , то все сообщения будут получены Application1 . Пока что все работает правильно.

ПРОБЛЕМА: проблема, с которой я сталкиваюсь, возникает при добровольной блокировке, например Application1 . Допустим, я отправляю FactAddedEvent , он принимается Application1 , но в методе использования обработчика я устанавливаю точку останова и жду там. Поэтому здесь я добровольно блокирую Приложение1 . Теперь я ожидаю, что поскольку Application1 заблокировано, RabbitMQ доставит все последующие события, которые я выбрасываю, ТОЛЬКО Application2 . Вместо этого он просто делает: как только RabbitMQ отправляет его Application1 , как только он отправляет его Application2 , независимо от того, что. Таким образом, в Application1 будет очередь сообщений, накапливающихся, даже если Application2 свободна для выполнения работы.

Что я делаю неправильно? Заранее спасибо

1 Ответ

2 голосов
/ 06 февраля 2020

Каналы RabbitMQ имеют PrefetchCount, который представляет собой количество неподтвержденных сообщений, которые брокер отправит потребителю, прежде чем приостановить ожидание подтверждения этих сообщений. По умолчанию MassTransit устанавливает это значение равным 16 или числу ядер ЦП x 2, в зависимости от того, что больше. Этот параметр предназначен для каждой конечной точки приема.

При запуске нескольких экземпляров брокер на основе своего алгоритма решает, какой канал / потребитель получает сообщение. Правило состоит в том, чтобы доставлять сообщения непосредственно потребителям с доступной предварительной выборкой, чтобы избежать начальной записи на диск. После того, как все предвыборные ограничения для всех пользователей достигнуты, сообщения буферизуются.

Даже в точке останова сервер считает, что вы все еще живы и пинок, поэтому он продолжит отправлять сообщения до тех пор, пока вы не заполните.

С помощью используемой конфигурации вы можете создать ConsumerDefinition для своего потребителя, чтобы изменить это значение предварительной выборки для конечной точки приема. Вы также можете просто настроить встроенный потребитель, используя синтаксис .Endpoint():

x.AddConsumer<FactAddedFirstHandler>()
    .Endpoint(x => x.PrefetchCount = 1);

Это число действительно низкое, и я бы не рекомендовал это значение в производстве , но для вашего тестирования это хорошо.

Надеюсь, это даст вам некоторые инструменты, чтобы все заработало так, как вы ожидаете.

...