Код вопроса является синхронным, поэтому выполняется только одно выполненное задание. async
не заставляет что-то работать асинхронно, это синтаксический сахар, который позволяет использовать await
для ожидания уже выполнения асинхронной операции для завершения без блокировки вызывающего потока.
Что касается примера документации, это то, что он есть. Пример документации, а не шаблон и, конечно, не то, что можно использовать в производстве, кроме простых случаев.
Что произойдет, если вы сможете сделать только 5 запросов одновременно, чтобы избежать переполнения вашей сети или процессора? Вам нужно будет загрузить только фиксированное количество записей для этого. Что если вам нужно обработать загруженные данные? Что если список URL-адресов поступает из другого потока?
Эти проблемы решаются с помощью одновременных контейнеров, шаблонов pub / sub и специально созданных классов Dataflow и Channel.
Поток данных
Старые классы Dataflow обеспечивают автоматическую буферизацию ввода-вывода и автоматическую обработку рабочих задач. Весь код загрузки можно заменить на ActionBlock :
var client=new HttpClient(....);
//Cancel if the process takes longer than 30 minutes
var cts=new CancellationTokenSource(TimeSpan.FromMinutes(30));
var options=new ExecutionDataflowBlockOptions(){
MaxDegreeOfParallelism=10,
BoundedCapacity=5,
CancellationToken=cts.Token
};
var block=new ActionBlock<string>(url=>ProcessUrl(url,client,cts.Token));
Вот и все. Блок будет использовать до 10 одновременных задач для выполнения до 10 одновременных загрузок. Он будет хранить до 5 URL в памяти (иначе все буферизуется). Если входной буфер заполнится, отправка элементов в блок будет выполняться асинхронно, что предотвращает медленную загрузку из-за переполнения памяти URL-адресами.
В том же или другом потоке «издатель» URL может публиковатьмного URL-адресов по своему усмотрению, сколько угодно.
foreach(var url in urls)
{
await block.SendAsync(url);
}
//Tell the block we're done
block.Complete();
//Wait until all downloads are complete
await block.Completion;
Мы можем использовать другие блоки, такие как TransformBlock, для получения вывода, передачи его в другой блок и, таким образом, для создания параллельного конвейера обработки. Допустим, у нас есть два метода, DownloadURL
и ParseResponse
вместо просто ProcessUrl
:
Task<string> DownloadUrlAsync(string url,HttpClient client)
{
return client.GetStringAsync(url);
}
void ParseResponse(string content)
{
var object=JObject.Parse();
DoSomethingWith(object);
}
Мы могли бы создать отдельный блок для каждого шага в конвейере с различными DOP и буферами:
var dlOptions=new ExecutionDataflowBlockOptions(){
MaxDegreeOfParallelism=5,
BoundedCapacity=5,
CancellationToken=cts.Token
};
var downloader=new TransformBlock<string,string>(
url=>DownloadUrlAsync(url,client),
dlOptions);
var parseOptions = new ExecutionDataflowBlockOptions(){
MaxDegreeOfParallelism=10,
BoundedCapacity=2,
CancellationToken=cts.Token
};
var parser=new ActionBlock<string>(ParseResponse);
downloader.LinkTo(parser, new DataflowLinkOptions{PropageateCompletion=true});
Мы можем публиковать URL-адреса для загрузчика и ждать, пока все они будут проанализированы. Используя разные DOP и возможности, мы можем сбалансировать количество задач загрузчика и анализатора, чтобы загрузить столько URL-адресов, сколько мы можем проанализировать и обработать, например, медленные загрузки или большие ответы.
foreach(var url in urls)
{
await downloader.SendAsync(url);
}
//Tell the block we're done
downloader.Complete();
//Wait until all urls are parsed
await parser.Completion;
Каналы
System.Threading.Channels представляет каналы в стиле Go. Это на самом деле низкоуровневые концепции, которые блокирует поток данных. Если бы каналы были доступны еще в 2012 году, они были бы написаны с использованием каналов.
Эквивалентный метод загрузки будет выглядеть следующим образом:
ChannelReader<string> Downloader(ChannelReader<string> ulrs,HttpClient client,
int capacity,CancellationToken token=default)
{
var channel=Channel.CreateBounded(capacity);
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var url in urls.ReadAsStreamAsync(token))
{
var response=await client.GetStringAsync(url);
await writer.WriteAsync(response);
}
}).ContinueWith(t=>writer.Complete(t.Exception));
return channel.Reader;
}
Это более многословно, но позволяет нам делать такие вещи, как создание HttpClient в методе и его повторное использование. ,Использование ChannelReader в качестве входных и выходных данных может показаться странным, но теперь мы можем связать такие методы, просто передав считыватель в качестве входных данных другому методу.
"Волшебство" заключается в том, что мы создаем рабочую задачу, которая ожидает обработки сообщений и немедленно возвращает читателя. Всякий раз, когда получается результат, он отправляется на канал и следующий шаг в конвейере.
Чтобы использовать несколько рабочих задач, мы можем использовать Enumerable.Range
, чтобы запустить многие из них, и Task.WhenAny
, чтобы закрытьканал, когда все каналы готовы:
ChannelReader<string> Downloader(ChannelReader<string> ulrs,HttpClient client,
int capacity,int dop,CancellationToken token=default)
{
var channel=Channel.CreateBounded(capacity);
var writer=channel.Writer;
var tasks = Enumerable
.Range(0,dop)
.Select(_=> Task.Run(async ()=>{
await foreach(var url in urls.ReadAllAsync(token))
{
var response=await client.GetStringAsync(url);
await writer.WriteAsync(response);
}
});
_=Task.WhenAll(tasks)
.ContinueWith(t=>writer.Complete(t.Exception));
return channel.Reader;
}
Издатели могут создать свой собственный канал и передать считыватель методу Downloader
. Им также не нужно ничего публиковать заранее:
var channel=Channel.CreateUnbounded<string>();
var dlReader=Downloader(channel.Reader,client,5,5);
foreach(var url in someUrlList)
{
await channel.Writer.WriteAsync(url);
}
channel.Writer.Complete();
Свободные конвейеры
Это так часто, что кто-то может создать метод расширения для этого. Например, чтобы преобразовать IList в Channel<T>
, нам не нужно ждать, так как все результаты уже доступны:
ChannelReader<T> Generate<T>(this IEnumerable<T> source)
{
var channel=Channel.CreateUnbounded<T>();
foreach(var item in source)
{
channel.Writer.TryWrite(T);
}
channel.Writer.Complete();
return channel.Reader;
}
Если мы также преобразуем Downloader
в метод расширения, мыможно использовать:
var pipeline= someUrls.Generate()
.Downloader(client,5,5);