Как подписаться на IObservable, но хранить в нем данные, пока не будет опубликован другой IObservable? - PullRequest
2 голосов
/ 20 сентября 2011

Я хочу:

  1. Немедленно подпишитесь на IObservable<T>, но немедленно начните буферизацию любого T, который получен (т.е. еще не виден моим IObserver<T>).
  2. Выполни работу.
  3. Когда работа будет завершена, очистите буфер до моего IObserver<T> и продолжайте

Очень важно, чтобы подписка - это первое, что происходит.

В форме «мраморной диаграммы» я ищу что-то вроде этого ...

Time                  T+1   2   3   4   5   6   7   8
s1:IObservable<int>     1   2   3   4   5   6   7   8  
s2:IObservable<bool>          t    
r: IObservable<int>           1 3   4   5   6   7   8
                              2

... в том, что в момент T + 1 я подписываюсь на IObservable<bool> r, который сам зависит от IObservable<int> s1 и IObservable<bool> s2. s1 - это поток, который я не контролирую, s2 - это поток, который я контролирую (субъект), и publish, когда работа завершена.

Я думал, что SkipUntil поможет мне, но это не буферизирует события, полученные до завершения зависимого IObservable.

Вот код, который, как я думал, будет работать, но не из-за того, что SkipUntil не является буфером.

        var are = new AutoResetEvent(false);
        var events = Observable.Generate(1, i => i < 12, i => i + 1, i => i, i => TimeSpan.FromSeconds(1));

        events.Subscribe(x => Console.WriteLine("events:" + x), () => are.Set());

        var subject = new Subject<int>();
        var completed = subject.AsObservable().Delay(TimeSpan.FromSeconds(5));

        Console.WriteLine("Subscribing to events...");

        events.SkipUntil(completed).Subscribe(x=> Console.WriteLine("events.SkipUntil(completed):"+ x));
        Console.WriteLine("Subscribed.");

        completed.Subscribe(x => Console.WriteLine("Completed"));

        subject.OnNext(10);

        are.WaitOne();
        Console.WriteLine("Done");

Я знаю о различных Buffer методах, но они не кажутся подходящими в этом случае, так как я здесь не буферирую, а просто координирую действия в начале моей подписки.

UPDATE

Я обобщил ответ Enigmativity на следующий метод расширения, который может быть полезен:

public static class ObservableEx
{
    public static IObservable<TSource> BufferUntil<TSource, TCompleted>(this IObservable<TSource> source, IObservable<TCompleted> completed)
    {
        var observable = Observable.Create<TSource>(o =>
        {
            var replaySubject = new ReplaySubject<TSource>();
            var sub1 = source.Subscribe(replaySubject);
            var query =
                completed.Take(1).Select(
                    x => replaySubject.AsObservable());
            var sub2 = query.Switch().Subscribe(o);
            return new CompositeDisposable(sub1, sub2);
        });
        return observable;
    }        
}

1 Ответ

4 голосов
/ 20 сентября 2011

Это работает для меня:

var r = Observable.Create<int>(o =>
{
    var rs = new ReplaySubject<int>();
    var subscription1 = s1.Subscribe(rs);
    var query = from f in s2.Take(1) select rs.AsObservable();
    var subscription2 = query.Switch().Subscribe(o);
    return new CompositeDisposable(subscription1, subscription2);
});
...