MassTransit (RabbitMq) + Asp.Net Core 2.1: несколько потребителей в одном проекте - PullRequest
0 голосов
/ 05 октября 2018

У меня есть проект Asp.Net 2.1, который выступает в качестве хоста службы для потребления сообщений, опубликованных другими процессами / приложениями.Я настроил / настроил несколько потребителей в классе Startup (Startup.cs), как показано ниже (для краткости здесь указана только часть MassTransit):

public void ConfigureServices(IServiceCollection services)
{   
    services.AddScoped<SendMessageConsumer>();
    services.AddScoped<AnotherMessageConsumer>();
    services.AddMassTransit(c =>
    {
        c.AddConsumer<SendMessageConsumer>();
        c.AddConsumer<AnotherMessageConsumer>();
    });

    services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host("localhost", "/", h => { });

        cfg.ReceiveEndpoint(host, "Queue-1", e =>
        {
            e.PrefetchCount = 16;
            e.UseMessageRetry(x => x.Interval(2, 100));

            e.LoadFrom(provider);
            e.Consumer<SendMessageConsumer>();
            EndpointConvention.Map<Message>(e.InputAddress);                    
        });

        cfg.ReceiveEndpoint(host, "Queue-2", e =>
        {
            e.PrefetchCount = 16;
            e.UseMessageRetry(x => x.Interval(2, 100));

            e.LoadFrom(provider);
            e.Consumer<AnotherMessageConsumer>();
            EndpointConvention.Map<AnotherMessage>(e.InputAddress);
        });
    }));

    services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
    services.AddSingleton<IHostedService, BusService>();   
}

Сообщения:

namespace MasstransitDemo.Models
{
    public class Message
    {
        public string Value { get; set; }
    }

    public class AnotherMessage
    {
        public string Value { get; set; }
    }
}

Потребители:

public class SendMessageConsumer : IConsumer<Message>
{
    public Task Consume(ConsumeContext<Message> context)
    {
        Console.WriteLine($"Receive message value: {context.Message.Value}");
        return Task.CompletedTask;
    }
}

public class AnotherMessageConsumer : IConsumer<AnotherMessage>
{
    public Task Consume(ConsumeContext<AnotherMessage> context)
    {
        Console.WriteLine($"Receive another message value: {context.Message.Value}");
        return Task.CompletedTask;
    }
}

Это заставляет оба сообщения проходить в каждую очередь.Смотрите итоговые биржи RabbitMq ниже:

enter image description here enter image description here

Как настроить его так, чтобы SendMessageConsumer получал только "Сообщение "и AnotherMessageConsumer для получения" AnotherMessage "?

Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 18 октября 2018

Это не работает для меня.Вы прокомментировали оба e.loadfrom(provider).Пожалуйста, поделитесь фрагментом кода, чтобы понять,

services.AddSingleton(provider => Bus.Factory.CreateUsingRabbitMq(configurator =>
    {
        IRabbitMqHost rabbitMqHost=configurator.Host(_busConfiguration.RabbitMqUri, _busConfiguration.Port, _busConfiguration.Vhost,hostConfigurator =>
        {
            hostConfigurator.Username(_busConfiguration.UserName);
            hostConfigurator.Password(_busConfiguration.Password);
        });

        configurator.ReceiveEndpoint(rabbitMqHost,_busConfiguration.GeneratePayLoadQueue, e =>
        {
            e.PrefetchCount = _busConfiguration.PrefetchCount;
            //e.LoadFrom(provider);
            e.Consumer<StagingConsumerService>(provider);
            EndpointConvention.Map<StagingConsumer>(e.InputAddress);
        });

        configurator.ReceiveEndpoint(rabbitMqHost, _busConfiguration.CreateJournalQueue , e =>
        {
            e.PrefetchCount = _busConfiguration.PrefetchFinDocCount;
            //e.LoadFrom(provider);
            e.Consumer<FinDocConsumerService>();
            EndpointConvention.Map<FinDocConsumer>(e.InputAddress);
        });
    }));

    services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
    services.AddSingleton<IHostedService, BusService>();
0 голосов
/ 05 октября 2018

Вы сообщаете MassTransit явно о своем потребителе, но также загружаете всех потребителей из контейнера для каждой конечной точки.

e.LoadFrom(provider);
e.Consumer<AnotherMessageConsumer>();

Делая это, вывсе оба потребителя к каждой конечной точке с LoadFrom, плюс один потребитель дополнительно к Consumer<T>.Таким образом, каждая из ваших конечных точек получает трех потребителей, и вы связываете обе очереди с обеими биржами.

Вам не нужно использовать LoadFromContainer здесь.Если у вашего потребителя есть зависимость, которую должен разрешить контейнер, вы можете использовать это:

e.Consumer<AnotherMessageConsumer>(container);
...