Можно ожидать, что реализация Repeat
будет содержать уведомление OnCompleted
, но получается , она реализована с точки зрения Concat
-из бесконечного потока.
public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> source) =>
RepeatInfinite(source).Concat();
private static IEnumerable<T> RepeatInfinite<T>(T value)
{
while (true)
{
yield return value;
}
}
С учетом того, что ответственность перешла на Concat
- мы можем создать упрощенную версию (подробности реализации gory приведены в TailRecursiveSink.cs
). Это все еще продолжает вращаться, если нет другого контекста выполнения, предоставленного await Task.Yield()
.
public static IObservable<T> ConcatEx<T>(this IEnumerable<IObservable<T>> enumerable) =>
Observable.Create<T>(observer =>
{
var check = new BooleanDisposable();
IDisposable loopRec(IScheduler inner, IEnumerator<IObservable<T>> enumerator)
{
if (check.IsDisposed)
return Disposable.Empty;
if (enumerator.MoveNext()) //this never returns false
return enumerator.Current.Subscribe(
observer.OnNext,
() => inner.Schedule(enumerator, loopRec) //<-- starts next immediately
);
else
return inner.Schedule(observer.OnCompleted); //this never runs
}
Scheduler.Immediate.Schedule(enumerable.GetEnumerator(), loopRec); //this runs forever
return check;
});
Будучи бесконечным потоком, enumerator.MoveNext()
всегда возвращает true, поэтому другая ветвь никогда не запускается - это ожидаемо; это не наша проблема.
Когда вызывается o.OnCompleted()
, он сразу же планирует следующий итеративный l oop в Schedule(enumerator, loopRec)
, который синхронно вызывает следующий o.OnCompleted()
, и он продолжается до бесконечности - нет Точка, где он может избежать этой рекурсии.
Если у вас есть переключатель контекста с await Task.Yield()
, то Schedule(enumerator, loopRec)
немедленно завершается, и o.OnCompleted()
вызывается несинхронно.
Repeat
и Concat
используют текущий поток для работы без изменения контекста - это не некорректное поведение, но когда тот же контекст используется и для уведомлений pu sh, это может привести к тупикам или попаданию в вечный батут.
стек аннотированных вызовов
[External Code]
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code]
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code]
ConcatEx.AnonymousMethod__2() //inner.Schedule(enumerator, loopRec)
[External Code]
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code]
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code]
ConcatEx.AnonymousMethod__0(observer) //Scheduler.Immediate.Schedule(...)
[External Code]
Main(args) //incremental.RepeatEx()...