Использовать Rx с брокером типизированных сообщений - PullRequest
0 голосов
/ 04 октября 2010

У меня есть брокер типизированных сообщений, аналогичный тому, который предоставляет Caliburn:

public interface IMessageBroker
{
    void Publish<T>(T message);
    IDisposable Subscribe<T>(Action<T> subscriber);
}

Как мне преобразовать подписки в IObservable?

Мне нужен метод расширения, что-то вроде этого:

public static IObservable<T> Subscribe<T>(this IMessageBroker messageBroker)
{
    var subject = new Subject<T>();
    messageBroker.Subscribe<T>(subject.OnNext);
    return subject;
}

проблема в этой реализации заключается в том, что я не могу отписаться, и поэтому она протекает.

Лучшее имя для метода подписки также приветствуется.

Ответы [ 2 ]

3 голосов
/ 04 октября 2010

Попробуйте (не проверено):

public static IObservable<T> ToObservable<T>(this IMessageBroker messageBroker)
{
    IObservable<T> observable = Observable.CreateWithDisposable<T>(o =>
        {
            return messageBroker.Subscribe<T>(o.OnNext);
        });
    return observable;
}

Что вы должны использовать следующим образом:

var observableBroker = messageBroker.ToObservable<int>();
var subject = new Subject<int>();
observableBroker.Subscribe(subject.OnNext);

//alternatively, there are overloads of Observerable.Subscribe which take lambdas:
observableBroker.Subscribe(t => DoSomethingWith(t));
0 голосов
/ 20 октября 2016

Попробуйте это ( Проверено )

Как я могу преобразовать подписки в IObservable?

Вы можете, используя Observable.Createпутем создания следующего метода расширения:

public static IObservable<T> AsObservable<T>(this IMessageBroker messageBroker)
{
    return Observable.Create<T>(observer => messageBroker.Subscribe<T>(observer.OnNext));
}

Примечание: System.Reactive пакет nuget не имеет Observable.CreateWithDisposable

или без Rx (почему? может быть не нужны зависимости):

public static IObservable<T> AsObservable<T>(this IMessageBroker messageBroker)
{
    return new DelegateObservable(observer => messageBroker.Subscribe<T>(observer.OnNext));
}

public class DelegateObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> subscriber;

    public DelegateObservable(Func<IObserver<T>, IDisposable> subscriber)
    {
        this.subscriber = subscriber;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return this.subscriber(observer);
    }
}
...