Как вы буферизуете элементы в группы в Reactive Extensions? - PullRequest
3 голосов
/ 06 августа 2011

У меня есть IObservable;где изменение свойства имеет идентификатор объекта и PropertyName.Я хочу использовать это для обновления базы данных, но если несколько свойств изменяются почти одновременно, я хочу сделать только одно обновление для всех свойств одного и того же объекта.

Если это был статический IEnumerable, и я использовал LINQ Iможно просто использовать:

MyList.GroupBy(C=>C.EntityID);

Однако список никогда не заканчивается (никогда не вызывает IObserver.OnComplete).То, что я хочу сделать, это подождать некоторый период времени, скажем, 1 секунду, сгруппировать все вызовы соответствующим образом на эту одну секунду.

В идеале у меня должны быть отдельные счетчики для каждого EntityID, и они будут сбрасываться всякий раз, когдановое изменение свойства было найдено для этого EntityID.

Я не могу использовать что-то вроде Throttle, потому что я хочу обрабатывать все изменения свойств, я просто хочу обрабатывать их вместе за один раз.

Ответы [ 2 ]

7 голосов
/ 06 августа 2011

Вот, пожалуйста:

MyObservable
    .Buffer(TimeSpan.FromSeconds(1.0))
    .Select(MyList =>
        MyList.GroupBy(C=>C.EntityID));
2 голосов
/ 06 августа 2011

Метод Buffer , кажется, делает то, что вы хотите.Дайте ему TimeSpan, и он свернет все сообщения в список.Существует также метод Window , который делает нечто подобное, но я не совсем уверен, какой может быть его семантика.

...