Я хочу:
- Немедленно подпишитесь на
IObservable<T>
, но немедленно начните буферизацию любого T
, который получен (т.е. еще не виден моим IObserver<T>
).
- Выполни работу.
- Когда работа будет завершена, очистите буфер до моего
IObserver<T>
и продолжайте
Очень важно, чтобы подписка - это первое, что происходит.
В форме «мраморной диаграммы» я ищу что-то вроде этого ...
Time T+1 2 3 4 5 6 7 8
s1:IObservable<int> 1 2 3 4 5 6 7 8
s2:IObservable<bool> t
r: IObservable<int> 1 3 4 5 6 7 8
2
... в том, что в момент T + 1 я подписываюсь на IObservable<bool>
r
, который сам зависит от IObservable<int>
s1
и IObservable<bool>
s2
. s1
- это поток, который я не контролирую, s2
- это поток, который я контролирую (субъект), и publish
, когда работа завершена.
Я думал, что SkipUntil
поможет мне, но это не буферизирует события, полученные до завершения зависимого IObservable
.
Вот код, который, как я думал, будет работать, но не из-за того, что SkipUntil
не является буфером.
var are = new AutoResetEvent(false);
var events = Observable.Generate(1, i => i < 12, i => i + 1, i => i, i => TimeSpan.FromSeconds(1));
events.Subscribe(x => Console.WriteLine("events:" + x), () => are.Set());
var subject = new Subject<int>();
var completed = subject.AsObservable().Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("Subscribing to events...");
events.SkipUntil(completed).Subscribe(x=> Console.WriteLine("events.SkipUntil(completed):"+ x));
Console.WriteLine("Subscribed.");
completed.Subscribe(x => Console.WriteLine("Completed"));
subject.OnNext(10);
are.WaitOne();
Console.WriteLine("Done");
Я знаю о различных Buffer
методах, но они не кажутся подходящими в этом случае, так как я здесь не буферирую, а просто координирую действия в начале моей подписки.
UPDATE
Я обобщил ответ Enigmativity на следующий метод расширения, который может быть полезен:
public static class ObservableEx
{
public static IObservable<TSource> BufferUntil<TSource, TCompleted>(this IObservable<TSource> source, IObservable<TCompleted> completed)
{
var observable = Observable.Create<TSource>(o =>
{
var replaySubject = new ReplaySubject<TSource>();
var sub1 = source.Subscribe(replaySubject);
var query =
completed.Take(1).Select(
x => replaySubject.AsObservable());
var sub2 = query.Switch().Subscribe(o);
return new CompositeDisposable(sub1, sub2);
});
return observable;
}
}