MassTransit / RabbitMQ - почему пропускается так много сообщений? - PullRequest
0 голосов
/ 08 мая 2019

Я работаю с 2 консольными приложениями .NET Core в сценарии производитель / потребитель с MassTransit / RabbitMQ. Я должен убедиться, что даже если ни один потребитель не запущен, сообщения от производителя все еще успешно помещаются в очередь. Похоже, что это не работает с Publish () - сообщения просто исчезли, поэтому я использую Send (). Сообщения по крайней мере помещаются в очередь, но без каких-либо потребителей, выполняющих сообщения, все попадают в очередь «_skipped».

Итак, это мой первый вопрос : это правильный подход, основанный на требовании (даже если НЕТ потребителей работают, сообщения от производителя по-прежнему успешно ставятся в очередь)?

С Send () мой потребитель действительно работает, но все же многие сообщения проваливаются и попадают в очередь "_skipped". Логика потребителя минимальна (просто регистрирует сообщение в данный момент), поэтому это не длительный процесс.

Итак, это мой второй вопрос : почему так много сообщений все еще выбрасывается в очередь "_skipped"?

И это приводит к моему третьему вопросу : означает ли это, что мой потребитель также должен прослушивать очередь "_skipped"?


Я не уверен, какой код вам нужен для ответа на этот вопрос, но вот скриншот из пользовательского интерфейса управления RabbitMQ:

RabbitMQ queues

Конфигурация производителя:

    static IHostBuilder CreateHostBuilder(string[] args)
    {
        return Host.CreateDefaultBuilder()
                      .ConfigureServices((hostContext, services) =>
                      {
                          services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));

                          services.AddMassTransit(cfg =>
                          {
                              cfg.AddBus(ConfigureBus);
                          });

                          services.AddHostedService<CardMessageProducer>();
                      })
                      .UseConsoleLifetime()
                      .UseSerilog();
    }

    static IBusControl ConfigureBus(IServiceProvider provider)
    {
        var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;

        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
            {
                h.Username(options.RabbitMQ_Username);
                h.Password(options.RabbitMQ_Password);
            });

            cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
            {
                EndpointConvention.Map<CardMessage>(e.InputAddress);
            });
        });
    }

Код производителя:

Bus.Send(message);

Конфигурация потребителя:

    static IHostBuilder CreateHostBuilder(string[] args)
    {
        return Host.CreateDefaultBuilder()
                      .ConfigureServices((hostContext, services) =>
                      {
                          services.AddSingleton<CardMessageConsumer>();

                          services.Configure<ApplicationConfiguration>(hostContext.Configuration.GetSection(nameof(ApplicationConfiguration)));

                          services.AddMassTransit(cfg =>
                          {
                              cfg.AddBus(ConfigureBus);
                          });

                          services.AddHostedService<MassTransitHostedService>();
                      })
                      .UseConsoleLifetime()
                      .UseSerilog();
    }

    static IBusControl ConfigureBus(IServiceProvider provider)
    {
        var options = provider.GetRequiredService<IOptions<ApplicationConfiguration>>().Value;

        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri(options.RabbitMQ_ConnectionString), h =>
            {
                h.Username(options.RabbitMQ_Username);
                h.Password(options.RabbitMQ_Password);
            });

            cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName, e =>
            {
                e.Consumer<CardMessageConsumer>(provider);
            });

            //cfg.ReceiveEndpoint(host, typeof(CardMessage).FullName + "_skipped", e =>
            //{
            //    e.Consumer<CardMessageConsumer>(provider);
            //});
        });
    }

Код потребителя:

class CardMessageConsumer : IConsumer<CardMessage>
{
    private readonly ILogger<CardMessageConsumer> logger;
    private readonly ApplicationConfiguration configuration;
    private long counter;

    public CardMessageConsumer(ILogger<CardMessageConsumer> logger, IOptions<ApplicationConfiguration> options)
    {
        this.logger = logger;
        this.configuration = options.Value;
    }

    public async Task Consume(ConsumeContext<CardMessage> context)
    {
        this.counter++;

        this.logger.LogTrace($"Message #{this.counter} consumed: {context.Message}");
    }
}

Ответы [ 2 ]

0 голосов
/ 08 мая 2019

В MassTransit очередь _skipped является реализацией концепции очереди недоставленных писем . Сообщения попадают туда, потому что они не расходуются.

MassTransit с RMQ всегда доставляет сообщение в обмен , а не в очередь . По умолчанию каждая конечная точка MassTransit создает (если нет существующей очереди) очередь с именем конечной точки, обмен с тем же именем и связывает их вместе. Когда у приложения есть настроенный потребитель (или обработчик), также создается обмен для этого типа сообщения (используя тип сообщения в качестве имени обмена), и обмен конечной точкой привязывается к обмену типа сообщения. Таким образом, когда вы используете Publish, сообщение публикуется для обмена типами сообщений и доставляется соответствующим образом с использованием привязки конечной точки (или нескольких привязок). Когда вы используете Send, тип обмена сообщениями не используется, поэтому сообщение попадает непосредственно в пункт назначения. И, как правильно сказал @maldworth, каждая конечная точка MassTransit ожидает только получение сообщений, которые она может использовать. Если он не знает, как использовать сообщение - сообщение перемещается в очередь недоставленных сообщений. Это, как и очередь ядовитых сообщений, являются основными шаблонами обмена сообщениями.

Если вам нужно, чтобы сообщения были поставлены в очередь для последующего использования, лучше всего настроить проводку, но сама конечная точка (я имею в виду приложение) не должна работать. Как только приложение запустится, оно будет использовать все сообщения в очереди.

0 голосов
/ 08 мая 2019

Когда потребитель запускает шину bus.Start(), он создает все обмены и очереди для транспорта.Если у вас есть требование, чтобы публикация / отправка происходила до того, как пользователь получит доступ, ваш единственный вариант - запустить DeployTopologyOnly.К сожалению, эта функция не задокументирована в официальных документах, но здесь приведены модульные тесты: https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.RabbitMqTransport.Tests/BuildTopology_Specs.cs

Пропущенная очередь возникает, когда сообщения отправляются потребителю, который не знает, как обрабатывать.

Например, если у вас есть потребитель, который может обработать IConsumer<MyMessageA>, который находится на имени конечной точки получения "my-queue-a".Но тогда ваш производитель сообщений делает Send<MyMessageB>(Uri("my-queue-a")...), ну, это проблема.Потребитель понимает только A, он не знает, как обрабатывать B. И поэтому он просто перемещает его в пропущенную очередь и продолжает.

...