Код пользователя никогда не должен контролировать, как задачи планируются напрямую. Во-первых, он не может - управление тем, как выполняются задачи, является задачей TaskScheduler . Когда код пользователя вызывает .Start()
, он просто добавляет задачу в очередь пула потоков для выполнения. await
выполняет уже выполняемые задачи.
В примерах TaskScheduler показано, как создавать ограниченные планировщики параллелизма, но, опять же, существуют более качественные параметры высокого уровня.
Код вопроса в любом случае не ограничивает задачи, стоящие в очереди, он ограничивает их количество. Они все уже бегут. Это похоже на пакетную предыдущую асинхронную операцию в конвейере, позволяющую только ограниченному количеству сообщений перейти на следующий уровень.
ActionBlock с задержкой
Простым, готовым к использованию способом было бы использование ActionBlock с ограниченным MaxDegreeOfParallelism, чтобы обеспечить не более N одновременных Операции могут выполняться одновременно. Если мы знаем, сколько времени занимает каждая операция, мы могли бы добавить небольшую задержку, чтобы убедиться, что мы не превышаем ограничение газа.
В этом случае 7 одновременно работающих рабочих выполняют 4 запроса в секунду, в общей сложности 28 максимальных запросов в секунду. BoundedCapacity
означает, что только до 7 элементов будут сохранены во входном буфере до блоков downloader.SendAsync
. Таким образом, мы избегаем переполнения ActionBlock
, если операции занимают слишком много времени.
var downloader = new ActionBlock<string>(
async url => {
await Task.Delay(250);
var response=await httpClient.GetStringAsync(url);
//Do something with it.
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 7, BoundedCapacity=7 }
);
//Start posting to the downloader
foreach(var item in urls)
{
await downloader.SendAsync(item);
}
downloader.Complete();
await downloader.Completion;
ActionBlock с SemaphoreSlim
Другой вариант - объединить это с SemaphoreSlim
, который периодически сбрасывается таймером.
var refreshTimer = new Timer(_=>sm.Release(30));
var downloader = new ActionBlock<string>(
async url => {
await semaphore.WaitAsync();
try
{
var response=await httpClient.GetStringAsync(url);
//Do something with it.
}
finally
{
semaphore.Release();
}
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, BoundedCapacity=5 }
);
//Start the timer right before we start posting
refreshTimer.Change(1000,1000);
foreach(....)
{
}