bufferClosingSelector
- это функция, вызываемая каждый раз для получения Observable, которое будет выдавать значение, когда ожидается закрытие буфера.
Например,
source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1)))
работает как обычная Buffer(time)
перегрузка.
Если вы хотите взвесить последовательность, вы можете применить Scan
к последовательности и затем принять решение о вашем условии агрегирования.
Например, source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
дает вам последовательность, которая выдает значение, когда исходная последовательность суммируется до более чем 100.
Вы можете использовать Amb
, чтобы участвовать в гонке между этими двумя условиями закрытия, чтобы увидеть, какая из них реагирует первой:
.Buffer(() => Observable.Amb
(
Observable.Timer(TimeSpan.FromSeconds(1)),
source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
)
)
Вы можете использовать любую серию комбинаторов, которая выдает любое значение для буфера, который будет закрыт в этой точке.
Примечание:
Значение, данное закрывающему селектору, не имеет значения - имеет значение уведомление. Поэтому для объединения источников разных типов с Amb
просто измените его на System.Reactive.Unit
.
Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit())