Rx фильтр по критерию и n значений после критерия - PullRequest
5 голосов
/ 03 марта 2012

Фильтровать только по критерию

changes.Where(p => Evaluate(p)).Subscribe(p => { // Do something });

Но как сделать так, чтобы вы получали значение критерия и n значений после того, как критерий выполнен (и эти значения n не обязательно должны совпадать?)критерий оценки)?

  • например, я хотел бы подписаться на поток, который возвращает Evaluate(p) и одно значение после этого (а затем снова начинает оценивать p)

Ответы [ 3 ]

4 голосов
/ 03 марта 2012

Пожалуйста, посмотрите SkipWhile и Взять методы расширения IEnumerable . Вы можете попробовать следующий код:

changes.SkipWhile(change => Evaluate(change) == false).Take(n).Subscribe(change => { /* do something */ });

Редактировать

Новый код для получения всех подходящих предметов из последовательности с n хвостом предметов (без повторного получения предметов)

// Let's assume elements in the sequence are of type Change
int i = 0;
Func<Change, bool> evaluateWithTail = change =>
{
    if (i <= 0 || i > n)
    {
        i = Evaluate(change) ? 1 : 0;
    }
    else
    {
        i++;
    }

    return i > 0;
}
// Please note delegate is specified as parameter directly - without lambda expression
changes.Where(evaluateWithTail).Subscribe(change => { /* do something */ });
3 голосов
/ 05 марта 2012

Вот еще одна реализация, которая немного короче:

var filtered = source
  .SkipWhile( x => !Criteria(x) )
  .Take(3)
  .Repeat()
2 голосов
/ 03 марта 2012

Я не думаю, что вы можете создать оператор Rx, комбинируя существующие операторы, потому что, по сути, вы хотите использовать оператор Where, но после его совпадения вы хотите «отключить» его для следующих N элементов.Хорошо, по-видимому, вы можете использовать оператор Repeat, и это доказывает, насколько составным является Rx.

В любом случае, вы также можете создать новый оператор, используя лучшие методы для создания собственного оператора Rx:

static class Extensions {

  public static IObservable<T> WhereThenTake<T>(
    this IObservable<T> source,
    Predicate<T> predicate,
    Int32 count
  ) {
    if (source == null)
      throw new ArgumentNullException("source");
    if (predicate == null)
      throw new ArgumentNullException("predicate");
    if (count < 0)
      throw new ArgumentException("count");
    return Observable.Create<T>(
      observer => {
        var finished = false;
        var n = 0;
        var disposable = source.Subscribe(
          x => {
            if (!finished) {
              if (n > 0) {
                observer.OnNext(x);
                n -= 1;
              }
              else if (predicate(x)) {
                n = count;
                observer.OnNext(x);
              }
            }
          },
          ex => { finished = true; observer.OnError(ex); },
          () => { finished = true; observer.OnCompleted(); }
        );
        return disposable;
      }
    );
  }

}

Затем вы используете его следующим образом (Evaluate - ваш предикат, а n - количество элементов, которые необходимо пройти после совпадения предиката):

changes.WhereThenTake(Evaluate, n).Subscribe( ... );
...