Мне нужно преобразовать поток объектов в поток пакетов объектов, сгруппировав их по значению свойства, используя Reactive Extensions :
class Record
{
public string Group;
public int Value;
}
IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
// ...
}
Пакет завершен и отправленв выходной поток, когда происходит одно из следующих событий:
- Новый объект поступает из исходного потока, и его значение
Group
отличается от предыдущего значения - В этом объекте не было новых объектов.исходный поток в течение
N
секунд
Например, если a1
означает new Record { Group = "a", Value = 1}
:
input: -a1-a2-a3-b1-b2-
output: -[a1, a2, a3]-[b1, b2]-
input: -a1-a2----------a3-
output: -[a1, a2]-------[a3]-
Пробовал различные комбинации GroupByUntil
, Debounce
,Buffer
и Timer
безрезультатно.Как это сделать?