Реактивное накопление данных, поступающих в пакетах - PullRequest
0 голосов
/ 17 марта 2020

Проблема: существует поток числовых c значений. Значения вводятся в виде пакетов, поэтому 100 значений могут очень близко подходить друг к другу (по времени), скажем, каждые 5-10 мс, а затем, возможно, на некоторое время останавливаться, а затем снова вспыхивать. Идея состоит в том, чтобы показать накопленное значение (сумму) windows длиной не более 500 мс.

Моя первая попытка была с буфером (500 мс), но это вызывает постоянную накачку событий (каждые 500 мс) с суммой 0 (в качестве накопленного буфера). пункты 0), это можно исправить с помощью фильтрации по пустым буферам, но я действительно хотел бы избежать этого полностью и открывать буферизацию только после того, как значение фактически выдвинуто после периода "тишины".

Дополнительные ограничения : реализация UniRx, которая не содержит всех операторов Rx, особенно Window (что, я подозреваю, может быть полезно в этом случае), поэтому решение ограничено базовыми c операторами, включая Buffer.

1 Ответ

2 голосов
/ 19 марта 2020

Поскольку вы просто хотите получить сумму, использование Buffer является излишним. Мы можем запустить Scan или Aggregation.

  var burstSum =
    source
        .Scan(0, (acc, current) => acc + current)
        .Throttle(TimeSpan.FromMilliseconds(500))
        .Take(1)
        .Repeat();

. Это запустит поток, который накапливает сумму до тех пор, пока поток не будет простаивать в течение как минимум 500 мс.

Но если мы Если вы хотите генерировать хотя бы каждый раз ведро, нам придется go другой путь. Мы делаем два предположения:

  1. Сумма временных интервалов между элементами должна быть равна временному интервалу между первым и последним элементом.
  2. Throttle освободит последнее значение после завершения потока.

    source
        .TimeInterval()
        .Scan((acc, cur) => new TimeInterval<int>(acc.Value + cur.Value, acc.Interval + cur.Interval))
        .TakeWhile(acc => acc.Interval <= TimeSpan.FromMilliseconds(500))
        .Throttle(TimeSpan.FromMilliseconds(500))
        .Select(acc => acc.Value)
        .Take(1)
        .Repeat();
    
...