Преобразование слушателя для потока в IObservable - PullRequest
1 голос
/ 14 ноября 2011

Мой сервисный метод берет слушателя и вызывает его с потоком данных.Я пытаюсь преобразовать этот поток в IObservable<T>.Пока это то, что я сделал:

public class MessageListener : IMessageListener
{

    private readonly Subject<string> stream = new Subject<string>();

    public IObservable<string> MessageStream
    {
        get
        {
            return this.stream;
        }
    }

    public void OnMessageAdded(string message)
    {
        this.stream.OnNext(message);
    }

}

//Calling code    
public IObservable<string> GetMessage()
{
     var listener = new MessageListener();
     service.Subscribe(listener);
     listener.MessageStream.SubscribeOn(Scheduler.NewThread);
}

Я не уверен, достаточно ли это хорошо.Я считаю, что звонок на SubscribeOn будет запускать только код подписки в новой теме.Как я могу убедиться, что OnMessageAdded получен новым потоком?

1 Ответ

1 голос
/ 14 ноября 2011

Как вы уже заметили, SubscribeOn будет контролировать только то, где будет происходить подписка.Тем не менее, в вашем коде SubscribeOn не действует вообще, потому что за ним не следует подписка.Помните, что SubscribeOn возвращает Observable, который будет использоваться для подписки.Дело не в том, что вы можете вызвать «SubscribeOn», чтобы установить какой-либо глобальный флаг внутри источника или что-то в этом роде.

Что вы на самом деле хотите сделать, так это вызвать «ObserveOn» внутри вашего сервиса непосредственно перед подпиской.ObserveOn определяет поток обработки входящих сообщений.Другой вариант - записать ObserveOn непосредственно в MessageListener, чтобы он выглядел следующим образом:

public IObservable<string> MessageStream
{
    get
    {
        return this.stream.ObserveOn(Scheduler.ThreadPool).AsObservable();
    }
}

Еще одна вещь, которую следует отметить, в вашем свойстве MessageStream лучше вызывать this.stream.AsObservable(), а не возвращать напрямуюпредмет.

...