Я попытался сделать оператор SerialGroupBy для Rx.Net.Цель оператора - работать как GroupBy, но каждый раз, когда создается новая группа, первая завершается.Таким образом, одновременно не может быть открыто более одной группы.
Моя текущая "лучшая" реализация выглядит примерно так:
public static IObservable<IGroupedObservable<TKey, TElement>> SerialGroupBy<TKey, TSource, TElement>(
this IObservable<TSource> stream, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector) =>
stream.Publish(shared =>
shared.GroupByUntil(keySelector, elementSelector, group =>
shared.DistinctUntilChanged(keySelector)));
И я надеялся закрыть группу до того, как следующая группа начнется, как показано здесь:
[Fact]
public void SerialGroupBy()
{
var scheduler = new TestScheduler();
var stream = scheduler.CreateHotObservable(
OnNext(201, "First group"),
OnNext(202, "Second group"),
OnNext(203, "Second group"));
var observer = scheduler.CreateObserver<string>();
stream.SerialGroupBy(x => x.Length, x => x)
.Select(x => x.Subscribe(observer))
.Subscribe();
scheduler.Start();
observer.Messages.ShouldBeLike(
OnNext(201, "First group"),
OnCompleted<string>(202),
OnNext(202, "Second group"),
OnNext(203, "Second group"));
}
Но завершение первых групп приходит слишком поздно, например:
OnNext(201, "First group"),
OnNext(202, "Second group"),
OnCompleted<string>(202),
OnNext(203, "Second group"));
Я могу понять, почему (открывающий наблюдатель уведомляется перед закрывающим наблюдателем согласно реализации GroupByUntil), но какМогу ли я реализовать это так, чтобы группы не перекрывались?
Я пробовал несколько разных способов, но это всегда заканчивается вариантом одной и той же проблемы.