Класс ChannelReader<T>
имеет метод ReadAllAsync
, который предоставляет данные считывателя как IAsyncEnumerable<T>
. Ниже приведена перегрузка этого метода, которая также принимает параметр timeout
. Этот параметр приводит к тому, что в случае, если считыватель не может испустить какие-либо предметы в течение указанного промежутка времени, выдается TimeoutException
.
Для сокращения выделений он использует ту же хитрую технику из Грега ответ , с одним CancellationTokenSource
, который перепланируется для отмены после каждой итерации. Подумав немного, я удалил строку CancelAfter(int.MaxValue)
, потому что в общем случае она скорее вредна, чем полезна, но я могу ошибаться.
public static async IAsyncEnumerable<TSource> ReadAllAsync<TSource>(
this ChannelReader<TSource> source, TimeSpan timeout,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (true)
{
using var cts = new CancellationTokenSource();
using var linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(cts.Token, cancellationToken);
cts.CancelAfter(timeout);
while (true)
{
try
{
if (!await source.WaitToReadAsync(linkedCts.Token).ConfigureAwait(false))
yield break;
}
catch (OperationCanceledException) when (cts.IsCancellationRequested)
{
throw new TimeoutException();
}
while (source.TryRead(out var item))
{
yield return item;
}
cts.CancelAfter(timeout);
if (cts.IsCancellationRequested) break;
}
}
}
Как я отмечаю, System.Interactive. Пакет Asyn c включает в себя метод Timeout
с подписью, показанной ниже, который можно использовать в сочетании со встроенным ReadAllAsync
и предоставлять те же функциональные возможности с вышеприведенной реализацией. Этот метод не оптимизирован для низких распределений.
public static IAsyncEnumerable<TSource> Timeout<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeout);