Rx Intersect оператор - PullRequest
       1

Rx Intersect оператор

1 голос
/ 03 апреля 2012

Я начну с самой простой излагаемой части моего вопроса: есть ли доступная реализация оператора Rx Intersect?

В основном у меня есть два потока, которые будут генерировать значения.Допустим, поток 1 производит: A, B, C, D, E, F, G И поток 2 производит: B, D, F

Оба потока завершатся и не будут бесконечными (для фона: они предоставляютсядвумя разными источниками данных, которые мы запрашивали одновременно).

Есть ли у кого-нибудь рекомендации по реализации асинхронного оператора пересечения в мире Rx?

Ответы [ 3 ]

4 голосов
/ 04 апреля 2012

Вот другая реализация, которая работает для горячих наблюдаемых, она дает вам заданное пересечение, поэтому, если 'c' появляется три раза в обоих потоках, оно будет появляться только один раз в потоке пересечения.

IObservable<char> stream1;
IObservable<char> stream2;

var intersect = Observable
  .Merge(stream1.Distinct(), stream2.Distinct())
  .GroupBy(c=>c)
  .SelectMany(g=>g.Skip(1).Take(1));

Пример:

stream1    ---a---b--c--d--e--a-b-c
stream2    -b---a---e---------a-b-c
intersect  -----a-b--------e------c
3 голосов
/ 03 апреля 2012

Насколько я знаю, "официальной" реализации нет. В основном вам нужно будет собрать значения из двух источников для хранения и проверки совпадений в противоположном источнике. Как-то так должно начаться:

<Extension()> 
Public Function Intersect(Of T)(first As IObservable(Of T), 
                                second As IObservable(Of T), 
                                comparer As IEqualityComparer(Of T)
                               ) As IObservable(Of T)
    If first Is Nothing Then Throw New ArgumentNullException("first")
    If second Is Nothing Then Throw New ArgumentNullException("second")
    If comparer Is Nothing Then Throw New ArgumentException("comparer")

    Return Observable.Create(Of T)(
        Function(obs)
            Dim gate As New Object()
            Dim firstItems As New HashSet(Of T)(comparer)
            Dim secondItems As New HashSet(Of T)(comparer)
            Dim firstCompleted, secondCompleted As Boolean

            Dim disp As New CompositeDisposable(2)
            disp.Add(first.Subscribe(Sub(v)
                                         SyncLock gate
                                             firstItems.Add(v)
                                             If secondItems.Contains(v) Then obs.OnNext(v)
                                         End SyncLock
                                     End Sub,
                                     AddressOf obs.OnError,
                                     Sub()
                                         SyncLock gate
                                             firstCompleted = True
                                             If secondCompleted Then obs.OnCompleted()
                                         End SyncLock
                                     End Sub))
            disp.Add(second.Subscribe(Sub(v)
                                          SyncLock gate
                                              secondItems.Add(v)
                                              If firstItems.Contains(v) Then obs.OnNext(v)
                                          End SyncLock
                                      End Sub,
                                      AddressOf obs.OnError,
                                      Sub()
                                          SyncLock gate
                                              secondCompleted = True
                                              If firstCompleted Then obs.OnCompleted()
                                          End SyncLock
                                      End Sub))
            Return disp
        End Function)
End Function

Эта реализация будет повторять совпадения, если вход содержит несколько вхождений, но только после того, как он будет найден в обоих источниках. Например

first  ----1---2---1----2---1---1----|
second ----------2----1-----------|
out    ----------2----1-2---1---1----|

Если повторы нежелательны, вы можете проверить, что их нет в соответствующей исходной коллекции. Подписка на первый изменится на:

first.Subscribe(Sub(v)
                    SyncLock gate
                        'check that the first doesn't already contain this value
                        If firstItems.Add(v) AndAlso
                           secondItems.Contains(v) Then obs.OnNext(v)
                    End SyncLock
                End Sub,
                AddressOf obs.OnError,
                Sub()
                    SyncLock gate
                        firstCompleted = True
                        If secondCompleted Then obs.OnCompleted()
                    End SyncLock
                End Sub)

со второй подпиской, изменяющейся аналогично.

1 голос
/ 03 апреля 2012

Разве вы не можете просто сделать:

var intersect = from x in stream1
                from y in stream2
                where x == y
                select x;
...