Реактивный: преобразование объединенных IObservable в один поток, который действует как BehaviorSubject - PullRequest
3 голосов
/ 09 марта 2011

Вот пример кода, который у меня есть ...

var rootSubject = new Subject<Types>();

var firstSubject = rootSubject.Where(x => x == Types.First);
var secondSubject = rootSubject.Where(x => x == Types.Second);
var thirdSubject = rootSubject.Where(x => x == Types.Third);
var forthSubject = rootSubject.Where(x => x == Types.Forth);

var mergedSubject = Observable.Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
                        .Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))
                        .Replay()
                        .RefCount();

rootSubject.OnNext(Types.Second);

var result = mergedSubject.First();

Console.WriteLine(String.Format("result - {0}", result));

По какой-то причине он всегда просто отключается и возвращает тип ошибки.Любая идея, что здесь происходит?

Что я пытаюсь сделать, это создать объединенный Iobservable, который является потоком, который действует как BehaviorSubject, так что если .OnNext (...) вызывается перед), сначала будет иметь значение.

Ответы [ 2 ]

3 голосов
/ 09 марта 2011

Я считаю, что проблема в том, что вы по сути не подключаете последовательность воспроизведения - или, возможно, делаете это слишком поздно.(Я не знаю деталей RefCount, но я подозреваю, что он подключается только тогда, когда на него что-то подписывается.)

Вот альтернатива, которая работает:

var mergedSubject = Observable
    .Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
    .Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))
    .Replay();

mergedSubject.Connect();
rootSubject.OnNext(Types.Second);

var result = mergedSubject.First();

Iне знаю, удовлетворяет ли это всем, что вам нужно, но, по крайней мере, выдает правильный результат для вашего тестового кода

0 голосов
/ 10 марта 2011

Джон прав в своем подозрении, что RefCount подключается только после того, как что-то подписалось на него. Итак, если вам действительно нужен RefCount, вы также можете сделать это:

var rootSubject = new Subject<Types>();
var firstSubject = rootSubject.Where(x => x == Types.First);
var secondSubject = rootSubject.Where(x => x == Types.Second);
var thirdSubject = rootSubject.Where(x => x == Types.Third);
var forthSubject = rootSubject.Where(x => x == Types.Fourth);
var mergedSubject = 
    Observable.Merge(firstSubject, secondSubject, thirdSubject, forthSubject)  
    .Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))
    .Replay().RefCount();

//added
mergedSubject.Subscribe();  

rootSubject.OnNext(Types.Second);

var result = mergedSubject.First();

Console.WriteLine(String.Format("result - {0}", result));
...