Как я могу вручную обработать любой подписанный на тип сообщения в NServiceBus? - PullRequest
2 голосов
/ 14 марта 2011

Я пытаюсь создать слой поверх NServiceBus, чтобы его было проще использовать другим разработчикам.

Я пытаюсь обойтись без файла конфигурации и сумел заставить издателя работать:

public class NServiceBusPublisher
{
    private IBus _Bus { get; set; }

    public void NServiceBusPublisher(string argInputQueue, string argErrorQueue)
    {
        Configure configure = NServiceBus.Configure.With().DefaultBuilder();
        var transport = configure.Configurer.ConfigureComponent<MsmqTransport>(ComponentCallModelEnum.Singleton);

        transport.ConfigureProperty(t => t.InputQueue, argInputQueue);
        transport.ConfigureProperty(t => t.ErrorQueue, argErrorQueue);
        transport.ConfigureProperty(t => t.NumberOfWorkerThreads, 1);
        transport.ConfigureProperty(t => t.MaxRetries, 5);

        _Bus =
            configure
            .XmlSerializer()
            .MsmqTransport()
                        .IsTransactional(true)
                        .PurgeOnStartup(false)
                    .MsmqSubscriptionStorage()  
            .UnicastBus()
                .ImpersonateSender(false)
            .CreateBus()
            .Start();

    }

    public void Publish(NServiceBus.IMessage argMessage)
    {
        _Bus.Publish(argMessage);
    }
}

Я также хочу иметь подписчика NServiceBus и предоставить разработчикам возможность подписаться на любое количество типов сообщений, если оно наследуется от NServiceBus.IMessage:

    public class NServiceBusSubscriber      
{
    private IBus _Bus { get; set; }

    public void NServiceBusSubscriber(string argInputQueue, string argOutputQueue, string argErrorQueue, string messagesAssembly)
    {
        Configure configure = NServiceBus.Configure.With().DefaultBuilder();

        var transport = configure.Configurer.ConfigureComponent<MsmqTransport>(ComponentCallModelEnum.Singleton);
        transport.ConfigureProperty(t => t.InputQueue, argInputQueue);
        transport.ConfigureProperty(t => t.ErrorQueue, argErrorQueue);
        transport.ConfigureProperty(t => t.NumberOfWorkerThreads, 1);
        transport.ConfigureProperty(t => t.MaxRetries, 5);

        var ucb = configure.Configurer.ConfigureComponent<NServiceBus.Unicast.UnicastBus>(ComponentCallModelEnum.Singleton);
        ucb.ConfigureProperty(u => u.MessageOwners, new Dictionary<string,string>()
        {
            {messagesAssembly, argOutputQueue}
        });

        _Bus =
            configure
            .XmlSerializer()
            .MsmqTransport()
                        .IsTransactional(true)
                        .PurgeOnStartup(false)
                    .MsmqSubscriptionStorage()                      
            .UnicastBus()
                .ImpersonateSender(false)
            .DoNotAutoSubscribe()
            .CreateBus()
            .Start();

    }

    public void Subscribe<T>() where T : NServiceBus.IMessage
    {
        _Bus.Subscribe<T>();
    }
} 

Проблема в том, что я не смог найти способ прикрепить обработчик события к определенному типу сообщения.

Не могли бы вы помочь мне понять это?

Большое спасибо.

Ответы [ 3 ]

1 голос
/ 20 мая 2011

Прошло много времени с тех пор, как вопрос был задан, поэтому я не уверен, была ли проблема решена, но вот один из способов, которым вы можете сделать это с помощью Bus.Subscribe (хотя, как говорили другие респонденты, это не так предписанный способ сделать это NServiceBus)

Подписаться на тип сообщения с использованием перегрузки подписки

    void Subscribe(Type messageType, Predicate<IMessage> condition);

Тогда вы можете обработать сообщение в делегате

    private bool Handle(NServiceBus.IMessage nsbMsg)
    {
       //you get the message instance that you can handle
       //return true
    }

Итак, ваш код будет

    class MySubscriber
    {
      public IBus Bus {get; set;}

      public void Subscribe()
      {
        Bus.Subscribe(typeof(MyMessage), Handle);
      }    

      public void Handle(NServiceBus.IMessage nsbMsg)
      {
        var msg = nsbMsg as MyMessage;

        //your code
        return true;
      } 
    }

Однако обратите внимание, что, выполняя это, вы должны сами управлять временем жизни обработчика, которое в противном случае управлялось бы для вас NServiceBus с использованием инфраструктуры IOC по вашему выбору.

Вы также должны будете явно передать ссылку на IBus, которая будет автоматически введена для вас, если вы только внедряете интерфейс IHandleMessage.

Архитектурный момент здесь заключается в том, что NSB является полноценным «ESB», а не просто уровнем обмена сообщениями. Добавление другого слоя поверх вашего ESB - ИМХО слишком много абстракции.

0 голосов
/ 14 марта 2011

NServiceBus автоматически обрабатывает подписку на сообщения.Когда вы вызываете Configure.With () .... Start ();NServiceBus будет сканировать, чтобы определить, какие сборки реализуют IHandleMessages (SomeMessage), и отправит запрос подписки издателю.

Когда вы добавляете «DoNotAutoSubscribe», вам нужно вручную получить все обрабатываемые сообщения и выполнитьBus.Subscribe () для каждого из них.

Помимо этого, NServiceBus автоматически обрабатывает маршрутизацию входящего сообщения соответствующему обработчику.В приведенном выше коде подписчика вы получаете сообщение об ошибке или сообщения исчезают из очереди?

0 голосов
/ 14 марта 2011

Поскольку вы не подписываетесь автоматически, первое, что вам нужно сделать, это подписаться на тип сообщения через Bus.Subscribe (). Другие могут сделать это в точке расширения IWantToRunAtStartUp (где-нибудь реализовать интерфейс в классе). Оттуда каждый подписчик будет реализовывать интерфейс IHandleMessages . Реализация этого интерфейса связывает вас с сообщением, где «T» - это тип сообщения.

Когда NSB запускается, он сканирует локальный каталог bin, находит все реализации вашего интерфейса и подключает их от вашего имени внутри страны. Оттуда он отправит правильному обработчику, когда придет сообщение такого типа.

...