Rx действительно хорошо подходит для этой проблемы IMO.
IObservables
не может 'OrderBy' по очевидным причинам (сначала вам нужно было бы наблюдать весь поток, чтобы гарантировать правильный порядок вывода),поэтому мой ответ ниже предполагает (что вы заявили), что ваши 2 исходных потока событий находятся в порядке.
В конце концов, это была интересная проблема.В стандартных операторах Rx отсутствует GroupByUntilChanged
, который мог бы легко это решить, поскольку он вызывал OnComplete
в предыдущей группе, наблюдаемой при наблюдении первого элемента следующей группы.Однако, глядя на реализацию DistinctUntilChanged
, она не следует этому шаблону и вызывает OnComplete
только тогда, когда завершается исходная наблюдаемая (даже несмотря на то, что она знает, что не будет больше элементов после первого не различимого элемента ... странно???).В любом случае, по этим причинам я решил отказаться от метода GroupByUntilChanged
(чтобы не нарушать соглашения Rx) и вместо этого пошел на ToEnumerableUntilChanged
.
Отказ от ответственности: Это мое первое расширение Rx, поэтому буду признателен за отзыв о моемвыбор сделан.Кроме того, одной из моих главных проблем является анонимная наблюдаемая, содержащая список distinctElements
.
Во-первых, код вашего приложения довольно прост:
public class Event
{
public DateTime Timestamp { get; set; }
}
private IObservable<Event> eventStream1;
private IObservable<Event> eventStream2;
public IObservable<IEnumerable<Event>> CombineAndGroup()
{
return eventStream1.CombineLatest(eventStream2, (e1, e2) => e1.Timestamp < e2.Timestamp ? e1 : e2)
.ToEnumerableUntilChanged(e => e.Timestamp);
}
Теперь для реализации ToEnumerableUntilChanged
(Стена кода предупреждения):
public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source, Func<TSource,TKey> keySelector)
{
// TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter
var comparer = EqualityComparer<TKey>.Default;
return Observable.Create<IEnumerable<TSource>>(observer =>
{
var currentKey = default(TKey);
var hasCurrentKey = false;
var distinctElements = new List<TSource>();
return source.Subscribe((value =>
{
TKey elementKey;
try
{
elementKey = keySelector(value);
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}
if (!hasCurrentKey)
{
hasCurrentKey = true;
currentKey = elementKey;
distinctElements.Add(value);
return;
}
bool keysMatch;
try
{
keysMatch = comparer.Equals(currentKey, elementKey);
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}
if (keysMatch)
{
distinctElements.Add(value);
return;
}
observer.OnNext( distinctElements);
distinctElements.Clear();
distinctElements.Add(value);
currentKey = elementKey;
}), observer.OnError, () =>
{
if (distinctElements.Count > 0)
observer.OnNext(distinctElements);
observer.OnCompleted();
});
});
}