Насколько я знаю, "официальной" реализации нет. В основном вам нужно будет собрать значения из двух источников для хранения и проверки совпадений в противоположном источнике. Как-то так должно начаться:
<Extension()>
Public Function Intersect(Of T)(first As IObservable(Of T),
second As IObservable(Of T),
comparer As IEqualityComparer(Of T)
) As IObservable(Of T)
If first Is Nothing Then Throw New ArgumentNullException("first")
If second Is Nothing Then Throw New ArgumentNullException("second")
If comparer Is Nothing Then Throw New ArgumentException("comparer")
Return Observable.Create(Of T)(
Function(obs)
Dim gate As New Object()
Dim firstItems As New HashSet(Of T)(comparer)
Dim secondItems As New HashSet(Of T)(comparer)
Dim firstCompleted, secondCompleted As Boolean
Dim disp As New CompositeDisposable(2)
disp.Add(first.Subscribe(Sub(v)
SyncLock gate
firstItems.Add(v)
If secondItems.Contains(v) Then obs.OnNext(v)
End SyncLock
End Sub,
AddressOf obs.OnError,
Sub()
SyncLock gate
firstCompleted = True
If secondCompleted Then obs.OnCompleted()
End SyncLock
End Sub))
disp.Add(second.Subscribe(Sub(v)
SyncLock gate
secondItems.Add(v)
If firstItems.Contains(v) Then obs.OnNext(v)
End SyncLock
End Sub,
AddressOf obs.OnError,
Sub()
SyncLock gate
secondCompleted = True
If firstCompleted Then obs.OnCompleted()
End SyncLock
End Sub))
Return disp
End Function)
End Function
Эта реализация будет повторять совпадения, если вход содержит несколько вхождений, но только после того, как он будет найден в обоих источниках. Например
first ----1---2---1----2---1---1----|
second ----------2----1-----------|
out ----------2----1-2---1---1----|
Если повторы нежелательны, вы можете проверить, что их нет в соответствующей исходной коллекции. Подписка на первый изменится на:
first.Subscribe(Sub(v)
SyncLock gate
'check that the first doesn't already contain this value
If firstItems.Add(v) AndAlso
secondItems.Contains(v) Then obs.OnNext(v)
End SyncLock
End Sub,
AddressOf obs.OnError,
Sub()
SyncLock gate
firstCompleted = True
If secondCompleted Then obs.OnCompleted()
End SyncLock
End Sub)
со второй подпиской, изменяющейся аналогично.