Оператор Join
- это то, что вы ищете.
Исходя из вашего рисунка, я предположил, что вы хотите, чтобы окно было фиксированной продолжительностью от первого последовательного высокого или низкого значения. Функция RisingEdges
(лучшее название, которое я смог придумать в короткие сроки) фильтрует источник, наблюдаемый по этим первым последовательным значениям.
<Extension()>
Public Function RisingEdges(Of T)(source As IObservable(Of T),
level As Func(Of T, Boolean)
) As IObservable(Of T)
Return source.Select(Function(v) Tuple.Create(v, level(v))) _
.DistinctUntilChanged(Function(vb) vb.Item2) _
.Where(Function(vb) vb.Item2) _
.Select(Function(vb) vb.Item1)
End Function
Оттуда вы можете использовать оператор Join
, чтобы найти случаи, когда эти края расположены близко друг к другу.
<Extension()>
Public Function FindClosePeaks(Of T)(source As IObservable(Of T),
high As Func(Of T, Boolean),
low As Func(Of T, Boolean),
windowDuration As TimeSpan
) As IObservable(Of Tuple(Of T, T))
Return Observable.Join(source.RisingEdges(high),
source.RisingEdges(low),
Function(h) Observable.Timer(windowDuration),
Function(l) Observable.Timer(windowDuration),
Function(h, l) Tuple.Create(h, l))
End Function
Остерегайтесь использования этой функции с холодными наблюдаемыми, поскольку на нее будет две подписки. В зависимости от источника, это может быть приемлемо, или вам может потребоваться сделать его «горячим» с помощью Publish
.
Мне не совсем понятно, что вы искали в качестве конечного результата, но, надеюсь, это должно приблизить вас к тому, что вы хотите.