Почему повторяется блок перечисления Enumerable to Observable - PullRequest
5 голосов
/ 05 апреля 2020

Это довольно познавательный вопрос из любопытства. Рассмотрим следующий фрагмент:

var enumerable = Enumerable.Range(0, 5);
var observable = enumerable.ToObservable();
var enu = observable.Concat(observable).ToEnumerable();
enu.ToObservable().SubscribeDebug();

Где SubscribeDebug подписывается простым наблюдателем:

public class DebugObserver<T> : IObserver<T>
{
    public void OnCompleted()
    {
        Debug.WriteLine("Completed");
    }

    public void OnError(Exception error)
    {
        Debug.WriteLine("Error");
    }

    public void OnNext(T value)
    {
        Debug.WriteLine("Value: {0}", value);
    }
}

Вывод этого:

Значение: 0

Значение: 1

Значение: 2

Значение: 3

Значение: 4

А затем блоки. Может ли кто-нибудь помочь мне понять основную причину, почему это происходит и почему наблюдаемое не завершается? Я заметил, что он завершается без вызова Concat, но блокируется с ним.

1 Ответ

7 голосов
/ 06 апреля 2020

Я посмотрел на источник из ToObservable и нашел минимальную реализацию. Он воспроизводит поведение, которое мы наблюдаем.

    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
        ToObservableEx(enumerable, CurrentThreadScheduler.Instance);

    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
        Observable.Create<T>
        (
            observer =>
            {
                IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
                {
                    if (enumerator.MoveNext()) 
                    {
                        observer.OnNext(enumerator.Current);
                        inner.Schedule(enumerator, loopRec); //<-- culprit
                    }
                    else
                    {
                        observer.OnCompleted();
                    }

                    // ToObservable.cs Line 117
                    // We never allow the scheduled work to be cancelled. 
                    return Disposable.Empty;
                }

                return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
            }
        );

С этим не по пути - суть проблемы заключается в поведении CurrentThreadScheduler, который используется планировщиком по умолчанию.

Поведение CurrentThreadScheduler заключается в том, что если расписание уже запущено , когда вызывается Schedule - оно в итоге ставится в очередь.

        CurrentThreadScheduler.Instance.Schedule(() =>
        {
            CurrentThreadScheduler.Instance.Schedule(() =>
                Console.WriteLine(1)
            );

            Console.WriteLine(2);
        });

Печать 2 1. Такое поведение в очереди является нашей отменой.

Когда вызывается observer.OnCompleted(), это заставляет Concat запустить следующее перечисление - однако, вещи не такие, как когда мы начинали - мы все еще внутри observer => { } блок, когда мы пытаемся запланировать следующий. Таким образом, вместо немедленного выполнения, следующий график ставится в очередь.

Теперь enumerator.MoveNext() пойман в тупик. Он не может перейти к следующему элементу - MoveNext блокируется до тех пор, пока следующий элемент не прибудет - который может быть доставлен только по расписанию ToObservable l oop.

Но планировщик может работать только для уведомления ToEnumerable, а затем и MoveNext(), который удерживается - как только он выходит из loopRec - чего он не может, потому что он заблокирован MoveNext в первое место.

Приложение

Это примерно то, что ToEnumerable (из GetEnumerator.cs ) делает (недопустимая реализация):

    public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
    {
        var gate = new SemaphoreSlim(0);
        var queue = new ConcurrentQueue<T>();

        using(observable.Subscribe(
            value => { queue.Enqueue(value); gate.Release(); }, 
            () => gate.Release()))
        while (true)
        {
            gate.Wait(); //this is where it blocks                

            if (queue.TryDequeue(out var current))
                yield return current;
            else
                break;
        }
    }

Ожидается, что перечислимые объекты будут блокироваться до тех пор, пока не будет получен следующий элемент - и поэтому существует реализация гейтинга. Это не Enumerable.Range, который блокирует, а ToEnumerable.

...