Метод System.Threading.Channels ReadAsyn c () блокирует выполнение - PullRequest
0 голосов
/ 03 августа 2020

Обзор

Я пытаюсь написать оболочку IAsyncEnumerable<T> вокруг интерфейса IObserver<T>. Сначала я использовал BufferBlock<T> в качестве резервного хранилища данных, но в ходе тестирования производительности и исследований выяснил, что на самом деле это довольно медленный тип, поэтому я решил присвоить типу System.Threading.Channels.Channel тип go. У меня была аналогичная проблема с моей реализацией BufferBlock, как у этой, но на этот раз я не знаю, как ее решить.

Проблема

Мой GetAsyncEnumerator() l oop блокируется вызовом await _channel.Reader.WaitToRead(token), если мой метод IObserver<T>.OnNext() еще не записал в _channel. Как правильно подождать, пока значение станет доступным для уступки в этом контексте без блокировки выполнения программы?

Реализация

public sealed class ObserverAsyncEnumerableWrapper<T> : IAsyncEnumerable<T>, IObserver<T>, IDisposable
{
    private readonly IDisposable _unsubscriber;
    private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();

    private bool _producerComplete;

    public ObserverAsyncEnumerableWrapper(IObservable<T> provider)
    {
        _unsubscriber = provider.Subscribe(this);
    }

    public async void OnNext(T value)
    {
        Log.Logger.Verbose("Adding value to Channel.");
        await _channel.Writer.WriteAsync(value);
    }

    public void OnError(Exception error)
    {
        _channel.Writer.Complete(error);
    }

    public void OnCompleted()
    {
        _producerComplete = true;
    }

    public async IAsyncEnumerator<T> GetAsyncEnumerator([EnumeratorCancellation] CancellationToken token = new CancellationToken())
    {
        Log.Logger.Verbose("Starting async iteration...");
        while (await _channel.Reader.WaitToReadAsync(token) || !_producerComplete)
        {
            Log.Logger.Verbose("Reading...");
            while (_channel.Reader.TryRead(out var item))
            {
                Log.Logger.Verbose("Yielding item.");
                yield return item;
            }
            Log.Logger.Verbose("Awaiting more items.");
        }
        Log.Logger.Verbose("Iteration Complete.");
        _channel.Writer.Complete();
    }

    public void Dispose()
    {
        _channel.Writer.Complete();
        _unsubscriber?.Dispose();
    }
}

введите описание изображения здесь

Дополнительный контекст

Это не имеет значения, но во время выполнения экземпляр IObservable<T>, переданный в конструктор, является CimAsyncResult, возвращенным из asyn c звонков на Microsoft.Management.Infrastructure apis. Они используют шаблон проектирования Observer, который я пытаюсь обернуть с помощью причудливого нового шаблона перечисления asyn c.

Edit

Обновлено с регистрацией в вывод отладчика и сделал мой метод OnNext() async / await, как предложил один из комментаторов. Вы можете видеть, что он никогда не входит в while() l oop.

1 Ответ

1 голос
/ 04 августа 2020

Далее по стеку вызовов я вызывал метод asyn c синхронно через методы GetAwaiter (). GetResult ().

Ага, это проблема .

Я сделал это, потому что в одном случае я хотел получить данные из конструктора. Я изменил эту реализацию, чтобы выполнить вызов с помощью Task.Run (), и теперь итераторы работают безупречно с обеими реализациями.

Есть лучшие решения, чем блокировка асинхронного кода. Использование Task.Run - это один из способов избежать тупиковой ситуации , но вы по-прежнему получаете неудовлетворительный пользовательский опыт (я предполагаю, что ваше приложение пользовательского интерфейса, поскольку существует SynchronizationContext).

Если для загрузки данных для отображения используется асинхронный перечислитель, то более правильным решением будет (синхронно) инициализировать пользовательский интерфейс в состояние «Загрузка ...», а затем обновить это состояние, когда данные загружаются асинхронно . Если асинхронный перечислитель используется для чего-то еще, вы можете найти подходящие альтернативные шаблоны в моем сообщении блога о конструкторах asyn c .

...