Вот что я получил с помощью RX Forum:
Идея состоит в том, чтобы выпустить серию «билетов» на оригинальную последовательность, которую нужно запустить.Эти «билеты» задерживаются на время ожидания, за исключением самого первого, который сразу же предшествует последовательности билетов.Когда приходит событие и происходит ожидание билета, событие срабатывает немедленно, в противном случае оно ждет до билета, а затем срабатывает.Когда он срабатывает, выдается следующий билет и так далее ...
Чтобы объединить билеты и оригинальные события, нам нужен комбинатор.К сожалению, «стандартный» .CombineLatest не может быть использован здесь, потому что он будет запускать билеты и мероприятия, которые использовались ранее.Поэтому мне пришлось создать свой собственный комбинатор, который в основном является фильтрованным .CombineLatest, который срабатывает только тогда, когда оба элемента в комбинации «свежие» - никогда не возвращались раньше.Я называю это .CombineVeryLatest aka .BrokenZip;)
Используя .CombineVeryLatest, приведенная выше идея может быть реализована следующим образом:
public static IObservable<T> SampleResponsive<T>(
this IObservable<T> source, TimeSpan delay)
{
return source.Publish(src =>
{
var fire = new Subject<T>();
var whenCanFire = fire
.Select(u => new Unit())
.Delay(delay)
.StartWith(new Unit());
var subscription = src
.CombineVeryLatest(whenCanFire, (x, flag) => x)
.Subscribe(fire);
return fire.Finally(subscription.Dispose);
});
}
public static IObservable<TResult> CombineVeryLatest
<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
var ls = leftSource.Select(x => new Used<TLeft>(x));
var rs = rightSource.Select(x => new Used<TRight>(x));
var cmb = ls.CombineLatest(rs, (x, y) => new { x, y });
var fltCmb = cmb
.Where(a => !(a.x.IsUsed || a.y.IsUsed))
.Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; });
return fltCmb.Select(a => selector(a.x.Value, a.y.Value));
}
private class Used<T>
{
internal T Value { get; private set; }
internal bool IsUsed { get; set; }
internal Used(T value)
{
Value = value;
}
}
Редактировать: вот еще один более компактный вариант CombineVeryLatest, предложенный AndreasKöpf на форуме:
public static IObservable<TResult> CombineVeryLatest
<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
return Observable.Defer(() =>
{
int l = -1, r = -1;
return Observable.CombineLatest(
leftSource.Select(Tuple.Create<TLeft, int>),
rightSource.Select(Tuple.Create<TRight, int>),
(x, y) => new { x, y })
.Where(t => t.x.Item2 != l && t.y.Item2 != r)
.Do(t => { l = t.x.Item2; r = t.y.Item2; })
.Select(t => selector(t.x.Item1, t.y.Item1));
});
}