Как создать IObservable <T>, который читает из очереди сообщений MSMQ? - PullRequest
4 голосов
/ 08 февраля 2012

Я удаляю нашу систему электронной почты с нашего сайта ASP.NET, который раньше сразу отправлял электронные письма с системой для обработки запросов в отдельном сервисе, чтобы уменьшить нагрузку на сайт.Я пытаюсь создать его вокруг набора интерфейсов, чтобы я мог поменять местами реализации, если захочу, но изначально он будет основан на очереди сообщений (MSMQ) для отправки запросов в очередь, чтобы служба получала входящие запросыа затем обработать их.В настоящее время у меня примерно определены следующие интерфейсы:

// Sends one or more requests to be processed somehow
public interface IRequestSender
{
    void Send(IEnumerable<Request> requests);
}

// Listens for incoming requests and passes them to an observer to do the real work
public interface IRequestListener : IObservable<Request>
{
    void Start();
    void Stop();
}

// Processes a request given to it by a IRequestListener
public interface IRequestProcessor : IObserver<Request>
{
}

Вы заметите, что слушатель и процессор используют наблюдаемый шаблон, поскольку, как мне кажется, он подходит лучше всего.

Моя проблемавыясняет, как написать реализацию IRequestListener, которая получает от MSMQ, в основном, как мне создать подходящий IObservable<T>?

Мой первый вариант, который я нашел, - это создать IObservable<T> с нуля на основена примере, приведенном в документации MSDN , но это кажется большой работой сантехники.

Другой вариант - использовать Reactive Extensions, так как он, кажется, предназначен для его создания.проще создавать наблюдаемые.Наиболее близкими к использованию Rx с MSMQ я нашел следующие страницы:

Но я не уверен, как я могу применить эти примеры к моему IRequestListener interface.

Любые другие идеи также приветствуются, даже изменения в моем базовом дизайне, если они подходят.

1 Ответ

5 голосов
/ 09 февраля 2012

Сначала я использовал FromAsyncPattern, но потом написал для него класс, потому что он лучше справлялся с таймаутом и отравленными сообщениями.После запуска очереди в любом случае являются горячими наблюдаемыми.Вы также можете использовать Observable.Defer, чтобы приблизить его к Rx вместо Start / Stop.

Вот базовая реализация QueueObservable.Вы можете просто начать, позвонив ListenReceive.

Subject<T> Subject = new Subject<T>();

protected void ListenReceive()
{
    Queue.BeginReceive(MessageQueue.InfiniteTimeout, null, OnReceive);
}

protected void OnReceive(IAsyncResult ar)
{
    Message message = null;

    try
    {
        message = Queue.EndReceive(ar);
    }
    catch (TimeoutException ex)
    {
        //retry?
    }

    if (message != null)
        Subject.OnNext((T) message.Body);

    Thread.Yield();

    if (!IsDisposed)
        ListenReceive();
}    

public IObservable<T> AsObservable()
{
        return Subject;
}
...