Не перекрывающиеся реактивные расширения, последовательный GroupBy (или WindowUntilChange) - PullRequest
0 голосов
/ 26 октября 2018

Я попытался сделать оператор SerialGroupBy для Rx.Net.Цель оператора - работать как GroupBy, но каждый раз, когда создается новая группа, первая завершается.Таким образом, одновременно не может быть открыто более одной группы.

Моя текущая "лучшая" реализация выглядит примерно так:

public static IObservable<IGroupedObservable<TKey, TElement>> SerialGroupBy<TKey, TSource, TElement>(
    this IObservable<TSource> stream, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector) =>
    stream.Publish(shared => 
        shared.GroupByUntil(keySelector, elementSelector, group => 
            shared.DistinctUntilChanged(keySelector)));

И я надеялся закрыть группу до того, как следующая группа начнется, как показано здесь:

[Fact]
public void SerialGroupBy()
{
    var scheduler = new TestScheduler();

    var stream = scheduler.CreateHotObservable(
        OnNext(201, "First group"),
        OnNext(202, "Second group"),
        OnNext(203, "Second group"));

    var observer = scheduler.CreateObserver<string>();

    stream.SerialGroupBy(x => x.Length, x => x)
        .Select(x => x.Subscribe(observer))
        .Subscribe();

    scheduler.Start();

    observer.Messages.ShouldBeLike(
        OnNext(201, "First group"),
        OnCompleted<string>(202),
        OnNext(202, "Second group"),
        OnNext(203, "Second group"));
}

Но завершение первых групп приходит слишком поздно, например:

OnNext(201, "First group"),
OnNext(202, "Second group"),
OnCompleted<string>(202),
OnNext(203, "Second group"));

Я могу понять, почему (открывающий наблюдатель уведомляется перед закрывающим наблюдателем согласно реализации GroupByUntil), но какМогу ли я реализовать это так, чтобы группы не перекрывались?

Я пробовал несколько разных способов, но это всегда заканчивается вариантом одной и той же проблемы.

...