Observable.Repeat невозможно остановить, это ошибка или особенность? - PullRequest
0 голосов
/ 03 апреля 2020

Я заметил кое-что странное с поведением оператора Repeat, когда уведомления наблюдаемой источника синхронны. Получаемая наблюдаемая не может быть остановлена ​​последующим оператором TakeWhile и, очевидно, продолжает работать вечно. Для демонстрации я создал наблюдаемую исходную информацию, которая выдает единственное значение, которое увеличивается на каждую подписку. Первый подписчик получает значение 1, второй получает значение 2 et c:

int incrementalValue = 0;
var incremental = Observable.Create<int>(async o =>
{
    await Task.CompletedTask;
    //await Task.Yield();

    Thread.Sleep(100);
    var value = Interlocked.Increment(ref incrementalValue);
    o.OnNext(value);
    o.OnCompleted();
});

Затем я прикрепил операторы Repeat, TakeWhile и LastAsync к этому наблюдаемому, так что программа будет ждать, пока составная наблюдаемая не выдаст свое последнее значение:

incremental.Repeat()
    .Do(new CustomObserver("Checkpoint A"))
    .TakeWhile(item => item <= 5)
    .Do(new CustomObserver("Checkpoint B"))
    .LastAsync()
    .Do(new CustomObserver("Checkpoint C"))
    .Wait();
Console.WriteLine($"Done");

class CustomObserver : IObserver<int>
{
    private readonly string _name;
    public CustomObserver(string name) => _name = name;
    public void OnNext(int value) => Console.WriteLine($"{_name}: {value}");
    public void OnError(Exception ex) => Console.WriteLine($"{_name}: {ex.Message}");
    public void OnCompleted() => Console.WriteLine($"{_name}: Completed");
}

Вот результат этой программы:

Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Checkpoint A: 7
Checkpoint A: 8
Checkpoint A: 9
Checkpoint A: 10
Checkpoint A: 11
Checkpoint A: 12
Checkpoint A: 13
Checkpoint A: 14
Checkpoint A: 15
Checkpoint A: 16
Checkpoint A: 17
...

Это никогда не заканчивается! Хотя LastAsync произвел свое значение и завершил работу, оператор Repeat продолжает вращаться!

Это происходит только в том случае, если наблюдаемый источник уведомляет своих подписчиков синхронно. Например, после раскомментирования строки //await Task.Yield(); программа ведет себя ожидаемым образом:

Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Done

Оператор Repeat прекращает вращение, хотя и не сообщает о завершении (я предполагаю, что оно было отписано).

Есть ли способ добиться согласованного поведения оператора Repeat, независимо от типа получаемых им уведомлений (syn c или asyn c)?

. NET Core 3.0, C# 8, System.Reactive 4.3.2, консольное приложение

1 Ответ

1 голос
/ 07 апреля 2020

Можно ожидать, что реализация Repeat будет содержать уведомление OnCompleted, но получается , она реализована с точки зрения Concat -из бесконечного потока.

    public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> source) =>
        RepeatInfinite(source).Concat();

    private static IEnumerable<T> RepeatInfinite<T>(T value)
    {
        while (true)
        {
            yield return value;
        }
    }

С учетом того, что ответственность перешла на Concat - мы можем создать упрощенную версию (подробности реализации gory приведены в TailRecursiveSink.cs). Это все еще продолжает вращаться, если нет другого контекста выполнения, предоставленного await Task.Yield().

public static IObservable<T> ConcatEx<T>(this IEnumerable<IObservable<T>> enumerable) =>
    Observable.Create<T>(observer =>
    {
        var check = new BooleanDisposable();

        IDisposable loopRec(IScheduler inner, IEnumerator<IObservable<T>> enumerator)
        {
            if (check.IsDisposed)
                return Disposable.Empty;

            if (enumerator.MoveNext()) //this never returns false
                return enumerator.Current.Subscribe(
                    observer.OnNext,
                    () => inner.Schedule(enumerator, loopRec) //<-- starts next immediately
                );
            else
                return inner.Schedule(observer.OnCompleted); //this never runs
        }

        Scheduler.Immediate.Schedule(enumerable.GetEnumerator(), loopRec); //this runs forever
        return check;
    });

Будучи бесконечным потоком, enumerator.MoveNext() всегда возвращает true, поэтому другая ветвь никогда не запускается - это ожидаемо; это не наша проблема.

Когда вызывается o.OnCompleted(), он сразу же планирует следующий итеративный l oop в Schedule(enumerator, loopRec), который синхронно вызывает следующий o.OnCompleted(), и он продолжается до бесконечности - нет Точка, где он может избежать этой рекурсии.

Если у вас есть переключатель контекста с await Task.Yield(), то Schedule(enumerator, loopRec) немедленно завершается, и o.OnCompleted() вызывается несинхронно.

Repeat и Concat используют текущий поток для работы без изменения контекста - это не некорректное поведение, но когда тот же контекст используется и для уведомлений pu sh, это может привести к тупикам или попаданию в вечный батут.

стек аннотированных вызовов

[External Code] 
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code] 
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code] 
ConcatEx.AnonymousMethod__2() //inner.Schedule(enumerator, loopRec)
[External Code] 
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code] 
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code] 
ConcatEx.AnonymousMethod__0(observer) //Scheduler.Immediate.Schedule(...)
[External Code] 
Main(args) //incremental.RepeatEx()...
...