Обзор
Я пытаюсь написать оболочку IAsyncEnumerable<T>
вокруг интерфейса IObserver<T>
. Сначала я использовал BufferBlock<T>
в качестве резервного хранилища данных, но в ходе тестирования производительности и исследований выяснил, что на самом деле это довольно медленный тип, поэтому я решил присвоить типу System.Threading.Channels.Channel
тип go. У меня была аналогичная проблема с моей реализацией BufferBlock, как у этой, но на этот раз я не знаю, как ее решить.
Проблема
Мой GetAsyncEnumerator()
l oop блокируется вызовом await _channel.Reader.WaitToRead(token)
, если мой метод IObserver<T>.OnNext()
еще не записал в _channel
. Как правильно подождать, пока значение станет доступным для уступки в этом контексте без блокировки выполнения программы?
Реализация
public sealed class ObserverAsyncEnumerableWrapper<T> : IAsyncEnumerable<T>, IObserver<T>, IDisposable
{
private readonly IDisposable _unsubscriber;
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();
private bool _producerComplete;
public ObserverAsyncEnumerableWrapper(IObservable<T> provider)
{
_unsubscriber = provider.Subscribe(this);
}
public async void OnNext(T value)
{
Log.Logger.Verbose("Adding value to Channel.");
await _channel.Writer.WriteAsync(value);
}
public void OnError(Exception error)
{
_channel.Writer.Complete(error);
}
public void OnCompleted()
{
_producerComplete = true;
}
public async IAsyncEnumerator<T> GetAsyncEnumerator([EnumeratorCancellation] CancellationToken token = new CancellationToken())
{
Log.Logger.Verbose("Starting async iteration...");
while (await _channel.Reader.WaitToReadAsync(token) || !_producerComplete)
{
Log.Logger.Verbose("Reading...");
while (_channel.Reader.TryRead(out var item))
{
Log.Logger.Verbose("Yielding item.");
yield return item;
}
Log.Logger.Verbose("Awaiting more items.");
}
Log.Logger.Verbose("Iteration Complete.");
_channel.Writer.Complete();
}
public void Dispose()
{
_channel.Writer.Complete();
_unsubscriber?.Dispose();
}
}
введите описание изображения здесь
Дополнительный контекст
Это не имеет значения, но во время выполнения экземпляр IObservable<T>
, переданный в конструктор, является CimAsyncResult
, возвращенным из asyn c звонков на Microsoft.Management.Infrastructure
apis. Они используют шаблон проектирования Observer, который я пытаюсь обернуть с помощью причудливого нового шаблона перечисления asyn c.
Edit
Обновлено с регистрацией в вывод отладчика и сделал мой метод OnNext()
async / await, как предложил один из комментаторов. Вы можете видеть, что он никогда не входит в while()
l oop.