Я не думаю, что вы можете создать оператор Rx, комбинируя существующие операторы, потому что, по сути, вы хотите использовать оператор Where
, но после его совпадения вы хотите «отключить» его для следующих N элементов.Хорошо, по-видимому, вы можете использовать оператор Repeat
, и это доказывает, насколько составным является Rx.
В любом случае, вы также можете создать новый оператор, используя лучшие методы для создания собственного оператора Rx:
static class Extensions {
public static IObservable<T> WhereThenTake<T>(
this IObservable<T> source,
Predicate<T> predicate,
Int32 count
) {
if (source == null)
throw new ArgumentNullException("source");
if (predicate == null)
throw new ArgumentNullException("predicate");
if (count < 0)
throw new ArgumentException("count");
return Observable.Create<T>(
observer => {
var finished = false;
var n = 0;
var disposable = source.Subscribe(
x => {
if (!finished) {
if (n > 0) {
observer.OnNext(x);
n -= 1;
}
else if (predicate(x)) {
n = count;
observer.OnNext(x);
}
}
},
ex => { finished = true; observer.OnError(ex); },
() => { finished = true; observer.OnCompleted(); }
);
return disposable;
}
);
}
}
Затем вы используете его следующим образом (Evaluate
- ваш предикат, а n
- количество элементов, которые необходимо пройти после совпадения предиката):
changes.WhereThenTake(Evaluate, n).Subscribe( ... );