Как зарегистрировать универсальный потребительский адаптер в MassTransit, если у меня есть список типов сообщений - PullRequest
0 голосов
/ 14 октября 2018

Я успешно использую MassTransit для глупого примера приложения, где я публикую сообщение (событие) из консольного приложения Publisher и получаю его от двух разных потребителей, которые также являются консольными приложениями, использующими RabbitMq.

Это весь пример проекта git repo: https://gitlab.com/iberodev/DiDrDe.MessageBus

Я хочу иметь проект, включающий функциональность MassTransit, чтобы мои проекты Publisher и Consumers ничего не знали о MassTransit.Зависимости должны идти в этом направлении:

  • DiDrDe.MessageBus ==> MassTransit
  • DiDrDe.MessageBus ==> DiDrDe.Contracts
  • DiDrDe.Model ==> DiDrDe.Contracts
  • DiDrDe.Publisher ==> DiDrDe.MessageBus
  • DiDrDe.Publisher ==> DiDrDe.Contracts
  • DiDrDe.Publisher ==> DiDr
  • DiDrDe.ConsumerOne ==> DiDrDe.Contracts
  • DiDrDe.ConsumerOne ==> DiDrDe.MessageBus
  • DiDrDe.ConsumerOne ==> DiDrD27.ModeDiDrDe.ConsumerTwo ==> DiDrDe.Contracts
  • DiDrDe.ConsumerTwo ==> DiDrDe.MessageBus
  • DiDrDe.ConsumerTwo ==> DiDrDe.Model

Ответы [ 2 ]

0 голосов
/ 17 октября 2018

Я отправил запрос на отладку вашей проблемы, которая работает для меня, потребитель начинает без проблем.

Вы можете расширить его, чтобы включить автоматическую регистрацию всех потребителей на их собственных конечных точках, если это необходимо.

Хитрость заключалась в том, чтобы правильно зарегистрировать типы обработчиков с соответствующими типами интерфейса.Немного типа механика, но у меня это работает с моей стороны.

0 голосов
/ 15 октября 2018

Взгляните на пользовательское соглашение для потребителей здесь: https://github.com/MassTransit/MassTransit/tree/develop/src/MassTransit.Tests/Conventional

Создайте свой собственный IMyConsumerInterface, IMyMessageInterface вставьте его в код из этого теста.Зарегистрируйте ConsumerConvention.Register<CustomConsumerConvention>(); перед созданием автобуса.Это должно работать.

Кроме того, вы можете создать свою собственную оболочку для контекста потребителя и передать ее вместе с сообщением.

LoadFrom (MassTransit.AutofacIntegration) у меня не работаетс пользовательским соглашением, поэтому мне пришлось вручную регистрировать потребителей

foreach (var consumer in consumerTypes)
  cfg.Consumer(consumer, (Type x) => _container.Resolve(x));

В качестве альтернативы , если вы хотите использовать "прокси" подход, сделайте что-то вроде:

public class WrapperConsumer<TConsumer, TMessage> : IConsumer<TMessage>
    where TMessage : class, IMyMessageInterface 
    where TConsumer : IMyConsumerInterface<TMessage>
{
    private readonly TConsumer _consumer;

    public WrapperConsumer(TConsumer consumer)
    {
        _consumer = consumer;
    }

    public Task Consume(ConsumeContext<TMessage> context)
    {
        return _consumer.Consume(context.Message);
    }
}

...

// create wrapper registrations
cfg.Consumer(() => new WrapperConsumer<MyConsumer, MyMessage>(new MyConsumer()));


дополнительный код, применимый к обоим подходам
// marker interface
public interface IMyConsumerInterface
{
}

public interface IMyConsumerInterface<T> : IMyConsumerInterface
    where T : IMyMessageInterface 
{
    Task Consume(T message);
}

...

builder.RegisterAssemblyTypes(ThisAssembly)
           .AssignableTo<IMyConsumerInterface>()
           .AsSelf()
           .As<IMyConsumerInterface>();

...

var interfaceType = typeof(IMyConsumerInterface);
var consumerTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
    .Where(x => interfaceType.IsAssignableFrom(x) && !x.IsInterface && !x.IsAbstract)
    .ToList();


RE: ОБНОВЛЕНИЕ 5
builder.Register(context =>
{
    var ctx = context.Resolve<IComponentContext>();
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
        {
            foreach (var adapterType in adapterTypes)
                consumer.Consumer(adapterType, (Type type) => ctx .Resolve(adapterType));
        });
    });
    return busControl;
})
...