Я хочу ограничить общее количество запросов, которые я отправляю на сервер базы данных по всем блокам DataFlow, до 30. В следующем сценарии регулирование 30 одновременных задач происходит в каждом блоке, поэтому при выполнении всегда выполняется 60 одновременных задач.Очевидно, что я мог бы ограничить свой параллелизм до 15 на блок, чтобы получить общую системную сумму 30, но это не было бы оптимальным.
Как мне сделать эту работу?Должен ли я ограничивать (и блокировать) свои ожидания с помощью SemaphoreSlim и т. Д., Или есть внутренний подход DataFlow, который работает лучше?
public class TPLTest
{
private long AsyncCount = 0;
private long MaxAsyncCount = 0;
private long TaskId = 0;
private object MetricsLock = new object();
public async Task Start()
{
ExecutionDataflowBlockOptions execOption = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 30 };
DataflowLinkOptions linkOption = new DataflowLinkOptions() { PropagateCompletion = true };
var doFirstIOWorkAsync = new TransformBlock<Data, Data>(async data => await DoIOBoundWorkAsync(data), execOption);
var doCPUWork = new TransformBlock<Data, Data>(data => DoCPUBoundWork(data));
var doSecondIOWorkAsync = new TransformBlock<Data, Data>(async data => await DoIOBoundWorkAsync(data), execOption);
var doProcess = new TransformBlock<Data, string>(i => $"Task finished, ID = : {i.TaskId}");
var doPrint = new ActionBlock<string>(s => Debug.WriteLine(s));
doFirstIOWorkAsync.LinkTo(doCPUWork, linkOption);
doCPUWork.LinkTo(doSecondIOWorkAsync, linkOption);
doSecondIOWorkAsync.LinkTo(doProcess, linkOption);
doProcess.LinkTo(doPrint, linkOption);
int taskCount = 150;
for (int i = 0; i < taskCount; i++)
{
await doFirstIOWorkAsync.SendAsync(new Data() { Delay = 2500 });
}
doFirstIOWorkAsync.Complete();
await doPrint.Completion;
Debug.WriteLine("Max concurrent tasks: " + MaxAsyncCount.ToString());
}
private async Task<Data> DoIOBoundWorkAsync(Data data)
{
lock(MetricsLock)
{
AsyncCount++;
if (AsyncCount > MaxAsyncCount)
MaxAsyncCount = AsyncCount;
}
if (data.TaskId <= 0)
data.TaskId = Interlocked.Increment(ref TaskId);
await Task.Delay(data.Delay);
lock (MetricsLock)
AsyncCount--;
return data;
}
private Data DoCPUBoundWork(Data data)
{
data.Step = 1;
return data;
}
}
Класс данных:
public class Data
{
public int Delay { get; set; }
public long TaskId { get; set; }
public int Step { get; set; }
}
Начальная точка:
TPLTest tpl = new TPLTest();
await tpl.Start();