Группировка IObservable по свойству и времени ожидания - PullRequest
0 голосов
/ 22 февраля 2019

Мне нужно преобразовать поток объектов в поток пакетов объектов, сгруппировав их по значению свойства, используя Reactive Extensions :

class Record
{
    public string Group;
    public int Value;
}

IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
    // ...
}

Пакет завершен и отправленв выходной поток, когда происходит одно из следующих событий:

  • Новый объект поступает из исходного потока, и его значение Group отличается от предыдущего значения
  • В этом объекте не было новых объектов.исходный поток в течение N секунд

Например, если a1 означает new Record { Group = "a", Value = 1}:

input:   -a1-a2-a3-b1-b2-
output:  -[a1, a2, a3]-[b1, b2]-

input:   -a1-a2----------a3-
output:  -[a1, a2]-------[a3]-

Пробовал различные комбинации GroupByUntil, Debounce,Buffer и Timer безрезультатно.Как это сделать?

1 Ответ

0 голосов
/ 22 февраля 2019

Хитрость заключалась в том, чтобы использовать GroupByUntil с Throttle на себе:

IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
    return source.GroupByUntil(x => x.Group, g => g.Throttle(timeout))
                 .SelectMany(x => x.ToList());
}
...