Редактировать
Обсуждение будет легче с соответствующим примером. Проверка URL-адресов не так уж дорога. Что если вам нужно набрать, например, 100 URL-адресов и выбрать первые 3 ответа?
В этом случае и рабочий, и буфер имеют смысл.
Редактировать 2
Один из комментариев добавляет дополнительную сложность - задачи выполняются одновременно и результаты должны отправляться по мере их поступления.
Для начала ValidateUrl
можно переписать как метод итератора:
private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
this IAsyncEnumerable<string> urls)
{
await foreach (var url in urls)
{
Console.WriteLine($"Url {url} received");
var isValid=await MockValidateUrl(url);
yield return (url, isValid);
}
}
Нет необходимости в рабочей задаче, поскольку все методы асинхронны. Метод итератора не будет выполняться, пока потребитель не запросит результата. Даже если MockValidateUrl
делает что-то дорогое, он может использовать Task.Run
сам или быть завернутым в Task.Run
. Тем не менее, это вызовет немало задач.
Для полноты картины вы можете добавить CancellationToken
и ConfigureAwait(false)
:
public static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
IAsyncEnumerable<string> urls,
[EnumeratorCancellation]CancellationToken token=default)
{
await foreach(var url in urls.WithCancellation(token).ConfigureAwait(false))
{
var isValid=await MockValidateUrl(url).ConfigureAwait(false);
yield return (url,isValid);
}
}
В любом случае, как только вызывающая сторона остановитсяитерация, ValidateUrls
остановится.
Буферизация
Буферизация является проблемой - независимо от того, как она запрограммирована, рабочий не остановится, пока буфер не заполнится. Размер буфера - это количество итераций, которые рабочий должен выполнить, прежде чем он поймет, что ему нужно остановиться. Это отличный пример для канала (да, опять!):
public static IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
IAsyncEnumerable<string> urls,CancellationToken token=default)
{
var channel=Channel.CreateBounded<(string Url, bool IsValid)>(2);
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var url in urls.WithCancellation(token))
{
var isValid=await MockValidateUrl(url);
await writer.WriteAsync((url,isValid));
}
},token)
.ContinueWith(t=>writer.Complete(t.Exception));
return channel.Reader.ReadAllAsync(token);
}
Хотя лучше обойти ChannelReaders, а не IAsyncEnumerables. По крайней мере, асинхронный перечислитель не создается, пока кто-то не попытается прочитать из ChannelReader. Кроме того, проще создавать конвейеры в качестве методов расширения:
public static ChannelReader<(string Url, bool IsValid)> ValidateUrls(
this ChannelReader<string> urls,int capacity,CancellationToken token=default)
{
var channel=Channel.CreateBounded<(string Url, bool IsValid)>(capacity);
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var url in urls.ReadAllAsync(token))
{
var isValid=await MockValidateUrl(url);
await writer.WriteAsync((url,isValid));
}
},token)
.ContinueWith(t=>writer.Complete(t.Exception));
return channel.Reader;
}
Этот синтаксис позволяет быстро создавать конвейеры. Допустим, у нас есть этот вспомогательный метод для преобразования IEnumerables в channesl (или IAsyncEnumerables):
public static ChannelReader<T> AsChannel(
IEnumerable<T> items)
{
var channel=Channel.CreateUnbounded();
var writer=channel.Writer;
foreach(var item in items)
{
channel.TryWrite(item);
}
return channel.Reader;
}
Мы можем написать:
var pipeline=urlList.AsChannel() //takes a list and writes it to a channel
.ValidateUrls();
await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
//Use the items here
}
Параллельные вызовы с немедленным распространением
С каналами это просто, хотя в это время работнику нужно запускать все задачи одновременно. По сути, нам нужно несколько работников. Это не то, что можно сделать с помощью только IAsyncEnumerable.
Прежде всего, если мы хотим использовать, например, 5 одновременных задач для обработки входных данных, мы могли бы написать
var tasks = Enumerable.Range(0,5).
.Select(_ => Task.Run(async ()=>{
///
},token));
_ = Task.WhenAll(tasks)(t=>writer.Complete(t.Exception));
вместо:
_ = Task.Run(async ()=>{
///
},token)
.ContinueWith(t=>writer.Complete(t.Exception));
Использование большого количества работников может быть достаточно. Я не уверен, что IAsyncEnumerable может использоваться несколькими работниками, и я не очень хочу это выяснить.
Преждевременное аннулирование
Все вышеперечисленные работыесли клиент потребляет все результаты. Чтобы остановить обработку, например, после первых 5 результатов, нам нужен CancellationToken:
var cts=new CancellationTokenSource();
var pipeline=urlList.AsChannel() //takes a list and writes it to a channel
.ValidateUrls(cts.Token);
int i=0;
await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
//Break after 3 iterations
if(i++>2)
{
break;
}
....
}
cts.Cancel();
Этот код сам по себе может быть извлечен в методе, который получает ChannelReader и, в данном случае, CancellationTokenSource:
static async LastStep(this ChannelReader<(string Url, bool IsValid)> input,CancellationTokenSource cts)
{
int i=0;
await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
//Break after 3 iterations
if(i++>2)
{
break;
}
....
}
cts.Cancel();
}
И конвейер становится:
var cts=new CancellationTokenSource();
var pipeline=urlList.AsChannel()
.ValidateUrls(cts.Token)
.LastStep(cts);