Реактивный: последнее значение будет получено IObservable - PullRequest
8 голосов
/ 08 марта 2011

Я знаю, что следующее является блокирующим вызовом и вернет первое значение в наблюдаемой последовательности:

var result = myObservable.First();

В зависимости от того, какой тип предмета я использую, это имеет разные значения:

  • Subject - First () будет блокироваться до следующего вызова OnNext (), что означает, в конечном итоге , что это будет самое последнее значение
  • BehaviorSubject - First () будет блокироваться, пока хотя бы одно значение не будет передано через OnNext () и поскольку BehaviorSubject отслеживает последнее значение , это будет самое последнее значение
  • ReplaySubject - First () будет блокироваться до тех пор, пока хотя бы одно значение не будет передано через OnNext (), но в случае, если многие элементы были пропущены через OnNext, оно будет первым включенным в его буфере последний

Теперь я пытаюсь найти последовательный способ получения последнего значения независимо от того, какой базовый Observable используется.

Есть идеи?

Ответы [ 2 ]

4 голосов
/ 10 марта 2011

Основываясь на вашем другом вопросе Rx, я думаю, вы хотите это:

var rootSubject = new ReplaySubject<Types>();
var firstSubject = rootSubject.Where(x => x == Types.First);
var secondSubject = rootSubject.Where(x => x == Types.Second);
var thirdSubject = rootSubject.Where(x => x == Types.Third);
var forthSubject = rootSubject.Where(x => x == Types.Fourth);

var mergedSubject = Observable
              .Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
        .Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))    
        .Replay();

mergedSubject.Connect();

rootSubject.OnNext(Types.First);
rootSubject.OnNext(Types.Second);

var result = mergedSubject.First();

rootSubject.OnNext(Types.Third);
rootSubject.OnNext(Types.Fourth);

Console.WriteLine(String.Format("result - {0}", result));

Теперь не имеет значения, какой предмет используется, все они возвращают «результат - сначала».

Если вы хотите получить последнее значение перед вызовом mergedSubject.First (), вы должны использовать Replay (1):

var mergedSubject = Observable
                .Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
                .Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))    
                .Replay(1);

В этом случае все типы объектов будут возвращать «результат - второй».

3 голосов
/ 09 марта 2011

Похоже, что вы ищете Replay (что в последней версии функционально эквивалентно использованию Multicast с ReplaySubject):

IObservable<int> source = SomeHotObservable();

IConnectableObservable<int> sourceWithOneBufferedValue = source.Replay(1);

IDisposable subscription = sourceWithOneBufferedValue.Connect();

source.OnNext(5);

sourceWithOneBufferedValue.Subscribe(x => Console.WriteLine(x));
...