Как отслеживать IObservable <double>за пределами диапазона в течение длительного периода времени - PullRequest
2 голосов
/ 10 января 2020

У меня есть IObservable<double>, который дает значения, считанные с датчика через несколько регулярный интервал. Я хочу сигнализировать, когда значение датчика выходит за пределы в течение длительного периода времени.

В качестве конкретного примера представьте себе наблюдаемый датчик температуры. Я хочу следить за тем, когда температура превышает 100 ° C в течение 5 секунд. То есть из IObservable<double> производят наблюдаемое или событие, которое срабатывает один раз, когда температура превышает 100 ° C в течение 5 секунд. Если для любого количества образцов значение температуры выше 100 ° C менее 5 секунд, это не имеет значения. Через 5 секунд после первого набора сэмплов выше 100 ° C он должен поднять сигнал. Если значение продолжает оставаться выше 100 ° C, сигнал не следует повышать до тех пор, пока температура не упадет ниже 100 ° C, а затем снова не превысит его в течение 5 секунд.

Кажется, что это должно быть простым делать с реактивными расширениями, но, будучи новичком в этом, я не могу ничего найти. Я просмотрел http://reactivex.io и http://introtorx.com, но ничего не нашел. Возможно, я просто не знаю правильный термин для поиска.

Ответы [ 3 ]

2 голосов
/ 19 января 2020

Я немного подумал об этом и понял, что мы можем его переосмыслить.

Давайте начнем с наблюдаемого порога:

var threshold = 
source
.Select(temp => temp > 100)
.DistinctUntilChanged();

Это дает значение, когда есть изменить выше 100 или ниже. Если он продолжает быть выше 100 или остается ниже него, он не создает никаких новых значений.

Теперь давайте определим:

var alarmUp =
    threshold
    .Throttle(TimeSpan.FromSeconds(5))
    .Where(cond => cond == true);

Здесь, если условие не меняется в течение 5 секунд, оператор throttle выплевывает значение. Теперь это может быть true (> 100) в течение 5 секунд или false (<100). Мы заинтересованы только в том, чтобы это было <code>true (> 100).

В промежутке, если есть какие-либо изменения, оператор дроссельной заслонки сбрасывается, поэтому условие должно сохраняться не менее 5 секунд.

1 голос
/ 20 января 2020

Вы должны использовать .Switch() для ситуаций, когда поступающее новое значение заставляет вас игнорировать результаты любых предыдущих введенных значений.

Вот запрос, который вам нужен:

IObservable<Unit> query =
    source
        .Select(x => x > 100.0)
        .DistinctUntilChanged()
        .Select(x => x
            ? Observable.Timer(TimeSpan.FromSeconds(5.0)).Select(x => Unit.Default)
            : Observable.Never<Unit>())
        .Switch();

Комбинация .Select(x => x >= 100.0).DistinctUntilChanged() изменяет источник на IObservable<bool>, который срабатывает только тогда, когда датчик переворачивает между x > 100.0 и x <= 100.0 - true при превышении 100 ° C и ложном для 100 ° C или меньше.

Теперь мы превращаем IObservable<bool> в IObservable<IObservable<Unit>>. Это наблюдаемая, которая производит другие наблюдаемые. Когда мы получаем true, я хочу вернуть IObservable<Unit>, который срабатывает через 5.0 секунд, а когда мы получаем false, я хочу вернуть наблюдаемое, которое вообще не возвращает значения.

Это то, что делает .Select(x => x ? Observable.Timer(TimeSpan.FromSeconds(5.0)).Select(x => Unit.Default) : Observable.Never<Unit>()).

Наконец, мы вставляем .Switch(), который изменяет IObservable<IObservable<Unit>> на IObservable<Unit>, производя только значения, основанные на последнем внутреннем произведенном IObservable<Unit>. Другими словами, если датчик переворачивается сверху 100.0 ниже снова через 5.0 секунд, то он игнорирует значение Observable.Timer(TimeSpan.FromSeconds(5.0)) и ожидает значения от Observable.Never<Unit>(). Если оно превышает 100.0 более чем на 1036 *, то срабатывает Observable.Timer(TimeSpan.FromSeconds(5.0)) и вы получаете Unit, полученный по вашему запросу.

Это ведет себя именно так, как вы хотели.

Вот несколько более простая версия запроса:

IObservable<Unit> query =
    source
        .Select(x => x > 100.0)
        .DistinctUntilChanged()
        .Select(x => x
            ? Observable.Timer(TimeSpan.FromSeconds(5.0))
            : Observable.Never<long>())
        .Switch()
        .Select(x => Unit.Default);
0 голосов
/ 10 января 2020

Должно быть возможно сделать это, используя встроенные операторы (функциональный стиль), но это более просто, если реализовать пользовательский оператор с императивной логикой c.

public static IObservable<Unit> Alarm<T>(this IObservable<T> source,
    T threshold, TimeSpan delay, IComparer<T> comparer = null)
{
    comparer = comparer ?? Comparer<T>.Default;
    return Observable.Create<Unit>(o =>
    {
        Stopwatch stopwatch = new Stopwatch();
        int alarmState = 0; // 0: OK, 1: above threshold, 2: signal transmitted

        return source.Subscribe(x =>
        {
            if (comparer.Compare(x, threshold) >= 0)
            {
                if (alarmState == 0)
                {
                    alarmState = 1;
                    stopwatch.Restart();
                }
                else if (alarmState == 1 && stopwatch.Elapsed >= delay)
                {
                    alarmState = 2;
                    o.OnNext(Unit.Default);
                }
            }
            else
            {
                alarmState = 0;
            }
        }, o.OnError, o.OnCompleted);
    });
}
...