ReactiveExtensions BufferWithPredicate - PullRequest
3 голосов
/ 28 мая 2011

Rx имеет методы для BufferWithTime , BufferWithCount и BufferWithTimeOrCount , я хочу написать метод BufferWithPredicate, который будет выглядеть так:

public IObservable<IList<T>> BufferWithPredicate<T>(this IObservable<T> input, Func<T, IList<T>, bool> predicate)

По сути, новые элементы будут добавлены в существующий буфер, если только предикат не возвращает false, в этом случае будет возвращен буфер и будет запущен новый.Предикат принимает следующий элемент и буфер в качестве параметров.

Как этого добиться?

1 Ответ

3 голосов
/ 29 мая 2011

Это должно сделать это для вас.Я использую Observable.Defer, чтобы он работал и с холодными наблюдаемыми:

public static class MyObservableExtensions
{
    public static IObservable<IList<T>> BufferWithPredicate<T>(this IObservable<T> input, Func<T, IList<T>, bool> predicate)
    {
        return Observable.Defer(() =>
            {
                var result = new Subject<IList<T>>();
                var list = new List<T>();
                input.Subscribe(item =>
                    {
                        if (predicate(item, list))
                        {
                            list.Add(item);
                        }
                        else
                        {
                            result.OnNext(list);
                            list = new List<T>();
                            list.Add(item);
                        }
                    }, 
                    () => result.OnNext(list));
                return result;
            });
    }
}

Использование:

var observable = new[] { 2, 4, 6, 8, 10, 12, 13, 14, 15 }.ToObservable();
var result = observable.BufferWithPredicate((item, list) => item % 2 == 0);
result.Subscribe(l => Console.WriteLine("New list arrived. Count = {0}", l.Count));

Вывод:

"New list arrived. Count = 6" 
"New list arrived. Count = 3"
...