Я посмотрел на источник из ToObservable
и нашел минимальную реализацию. Он воспроизводит поведение, которое мы наблюдаем.
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
ToObservableEx(enumerable, CurrentThreadScheduler.Instance);
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
Observable.Create<T>
(
observer =>
{
IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
{
if (enumerator.MoveNext())
{
observer.OnNext(enumerator.Current);
inner.Schedule(enumerator, loopRec); //<-- culprit
}
else
{
observer.OnCompleted();
}
// ToObservable.cs Line 117
// We never allow the scheduled work to be cancelled.
return Disposable.Empty;
}
return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
}
);
С этим не по пути - суть проблемы заключается в поведении CurrentThreadScheduler
, который используется планировщиком по умолчанию.
Поведение CurrentThreadScheduler
заключается в том, что если расписание уже запущено , когда вызывается Schedule
- оно в итоге ставится в очередь.
CurrentThreadScheduler.Instance.Schedule(() =>
{
CurrentThreadScheduler.Instance.Schedule(() =>
Console.WriteLine(1)
);
Console.WriteLine(2);
});
Печать 2 1
. Такое поведение в очереди является нашей отменой.
Когда вызывается observer.OnCompleted()
, это заставляет Concat
запустить следующее перечисление - однако, вещи не такие, как когда мы начинали - мы все еще внутри observer => { }
блок, когда мы пытаемся запланировать следующий. Таким образом, вместо немедленного выполнения, следующий график ставится в очередь.
Теперь enumerator.MoveNext()
пойман в тупик. Он не может перейти к следующему элементу - MoveNext
блокируется до тех пор, пока следующий элемент не прибудет - который может быть доставлен только по расписанию ToObservable
l oop.
Но планировщик может работать только для уведомления ToEnumerable
, а затем и MoveNext()
, который удерживается - как только он выходит из loopRec
- чего он не может, потому что он заблокирован MoveNext
в первое место.
Приложение
Это примерно то, что ToEnumerable
(из GetEnumerator.cs ) делает (недопустимая реализация):
public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
{
var gate = new SemaphoreSlim(0);
var queue = new ConcurrentQueue<T>();
using(observable.Subscribe(
value => { queue.Enqueue(value); gate.Release(); },
() => gate.Release()))
while (true)
{
gate.Wait(); //this is where it blocks
if (queue.TryDequeue(out var current))
yield return current;
else
break;
}
}
Ожидается, что перечислимые объекты будут блокироваться до тех пор, пока не будет получен следующий элемент - и поэтому существует реализация гейтинга. Это не Enumerable.Range
, который блокирует, а ToEnumerable
.