Можно ли Observable.Buffer на что-то, кроме времени - PullRequest
7 голосов
/ 05 марта 2012

Я искал примеры того, как использовать Observable.Buffer в rx, но не могу найти ничего более существенного, чем буферизованное время.

Кажется, существует перегрузка для указания«bufferClosingSelector», но я не могу обернуться вокруг этого.

Я пытаюсь создать последовательность, которая буферизуется по времени или по «накоплению».Рассмотрим поток запросов, в котором каждый запрос имеет некоторый вес, и я не хочу обрабатывать больше, чем x накопленного веса за раз, или, если недостаточно накопленного, просто дайте мне то, что пришло за последний период (обычная функция буфера))

1 Ответ

15 голосов
/ 06 марта 2012

bufferClosingSelector - это функция, вызываемая каждый раз для получения Observable, которое будет выдавать значение, когда ожидается закрытие буфера.

Например,

source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1))) работает как обычная Buffer(time) перегрузка.

Если вы хотите взвесить последовательность, вы можете применить Scan к последовательности и затем принять решение о вашем условии агрегирования.

Например, source.Scan((a,c) => a + c).SkipWhile(a => a < 100) дает вам последовательность, которая выдает значение, когда исходная последовательность суммируется до более чем 100.

Вы можете использовать Amb, чтобы участвовать в гонке между этими двумя условиями закрытия, чтобы увидеть, какая из них реагирует первой:

        .Buffer(() => Observable.Amb
                     (
                          Observable.Timer(TimeSpan.FromSeconds(1)), 
                          source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
                     )
               )

Вы можете использовать любую серию комбинаторов, которая выдает любое значение для буфера, который будет закрыт в этой точке.

Примечание: Значение, данное закрывающему селектору, не имеет значения - имеет значение уведомление. Поэтому для объединения источников разных типов с Amb просто измените его на System.Reactive.Unit.

Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit())
...