Я пытаюсь создать процесс потока данных производителя / потребителя. В рамках этого я буду создавать несколько задач производителя для разных диапазонов идентификаторов, которые будут генерировать записи данных для обработки. Поэтому я планирую ограничить количество потоков, которые будут подключаться к базе данных, чтобы получать данные и быть активными одновременно.
Однако метод, который извлекает данные и отправляет их в BufferBlock, имеет тип возврата, так что я могу получить количество извлеченных записей. Поэтому я не могу понять, где вызывать метод Release из класса SemaphoreSlim, но все еще могу получить возвращаемое значение? Ниже приведен пример кода, который я использую. Может кто-нибудь предложить какие-либо обходные пути для этого?
private Task<KeyRange>[] DataExtractProducer(ITargetBlock<DataRow[]> targetBuffer, ExtractQueryConfiguration QueryConf)
{
CancellationToken cancelToken = cancelTokenSrc.Token;
var tasks = new List<Task<KeyRange>>();
int taskCount = 0;
int maxThreads = QueryConf.MaxThreadsLimit > 0 ? QueryConf.MaxThreadsLimit : DataExtConstants.DefaultMaxThreads;
using (SemaphoreSlim concurrency = new SemaphoreSlim(maxThreads))
{
foreach (KeyRange range in keyRangeList)
{
concurrency.WaitAsync();
var task = Task.Run(() =>
{
Console.WriteLine(MsgConstants.StartKeyRangeExtract, QueryConf.KeyColumn, range.StartValue, range.EndValue);
//concurrency.Release();
return GetDataTask(targetBuffer, range, QueryConf.ExtractQuery);
}, cancelToken);
tasks.Add(task);
taskCount++;
}
}
Console.WriteLine(MsgConstants.TaskCountMessage, taskCount);
return tasks.ToArray<Task<KeyRange>>();
}
Редактировать: я пробовал этот вариант также. Но это не похоже на работу. Я пытался с пределом 20. Но я вижу более 50 подключений к БД. В конце концов, я достигаю высокого потребления памяти из-за нерегулируемых соединений.
using (SemaphoreSlim concurrency = new SemaphoreSlim(maxThreads))
{
foreach (KeyRange range in keyRangeList)
{
concurrency.WaitAsync();
var task = Task.Run(() =>
{
Console.WriteLine(MsgConstants.StartKeyRangeExtract, QueryConf.KeyColumn, range.StartValue, range.EndValue);
var temptask = await GetDataTask(targetBuffer, range, QueryConf.ExtractQuery);
concurrency.Release();
return temptask;
}, cancelToken);
tasks.Add(task);
taskCount++;
}
}
Спасибо!