Объединить потоки с максимальным интервалом времени между значениями - PullRequest
0 голосов
/ 25 января 2012

(Использование оператора Zip в Reactive Extensions (Rx) )

Объединение потоковых пар в одну без тайм-аута

        var xyZipped = xStream.Zip(yStream, (x, y) =>
        {
            Debug.WriteLine("Latest Pair Has Arrived");
            return new List<SomeType> { x, y };
        });
  • Но как вы могли бы ввести максимально допустимый интервал времени между двумя значениями в каждом потоке, чтобы при превышении интервала между значениями никакое значение не выводилось бы из xyZipped

  • И если между двумя значениями проходит слишком длинный , то спаривание также должно быть сброшено , т. Е. Для другого спаривания, которое должно произойти после тайм-аута, новое значение должно быть получено в каждом из потоков (не только один).

  • Или было бы лучше использовать другой оператор / реализацию для достижения такой логики потока?

1 Ответ

1 голос
/ 25 января 2012

Вы можете просто использовать Rx комбинаторы. Поскольку ваша основная цель - Zip, давайте начнем с Zip, а затем применим условия истечения срока действия.

public static IObservable<TOut> ZipWithExpiry<TLeft, TRight, TOut>(
                    IObservable<TLeft> left, 
                    IObservable<TRight> right, 
                    Func<TLeft, TRight, TOut> selector, 
                    TimeSpan validity)
        {
            return Observable.Zip(left.Timestamp(), right.Timestamp(), (l, r) => Tuple.Create(l, r))
                             .Where(tuple => Math.Abs((tuple.Item1.Timestamp - tuple.Item2.Timestamp).TotalSeconds) < validity.TotalSeconds)
                             .Select(tuple => selector(tuple.Item1.Value, tuple.Item2.Value));
        }

Если вы хотите проверить соседние значения в потоке, вы можете переписать его, используя оператор TimeInterval вместо Timestamp.

...