Использование SemaphoreSlim для задач с типом возврата - PullRequest
0 голосов
/ 28 августа 2018

Я пытаюсь создать процесс потока данных производителя / потребителя. В рамках этого я буду создавать несколько задач производителя для разных диапазонов идентификаторов, которые будут генерировать записи данных для обработки. Поэтому я планирую ограничить количество потоков, которые будут подключаться к базе данных, чтобы получать данные и быть активными одновременно.

Однако метод, который извлекает данные и отправляет их в BufferBlock, имеет тип возврата, так что я могу получить количество извлеченных записей. Поэтому я не могу понять, где вызывать метод Release из класса SemaphoreSlim, но все еще могу получить возвращаемое значение? Ниже приведен пример кода, который я использую. Может кто-нибудь предложить какие-либо обходные пути для этого?

private Task<KeyRange>[] DataExtractProducer(ITargetBlock<DataRow[]> targetBuffer, ExtractQueryConfiguration QueryConf)
{
    CancellationToken cancelToken = cancelTokenSrc.Token;
    var tasks = new List<Task<KeyRange>>();
    int taskCount = 0;
    int maxThreads = QueryConf.MaxThreadsLimit > 0 ? QueryConf.MaxThreadsLimit : DataExtConstants.DefaultMaxThreads;

    using (SemaphoreSlim concurrency = new SemaphoreSlim(maxThreads))
    {
        foreach (KeyRange range in keyRangeList)
        {
            concurrency.WaitAsync();

            var task = Task.Run(() =>
            {
                Console.WriteLine(MsgConstants.StartKeyRangeExtract, QueryConf.KeyColumn, range.StartValue, range.EndValue);
                //concurrency.Release();
                return GetDataTask(targetBuffer, range, QueryConf.ExtractQuery);
            }, cancelToken);

            tasks.Add(task);
            taskCount++;
        }
    }

    Console.WriteLine(MsgConstants.TaskCountMessage, taskCount);

    return tasks.ToArray<Task<KeyRange>>();
}

Редактировать: я пробовал этот вариант также. Но это не похоже на работу. Я пытался с пределом 20. Но я вижу более 50 подключений к БД. В конце концов, я достигаю высокого потребления памяти из-за нерегулируемых соединений.

using (SemaphoreSlim concurrency = new SemaphoreSlim(maxThreads))
        {
            foreach (KeyRange range in keyRangeList)
            {
                concurrency.WaitAsync();

                var task = Task.Run(() =>
                {
                    Console.WriteLine(MsgConstants.StartKeyRangeExtract, QueryConf.KeyColumn, range.StartValue, range.EndValue);                    
                    var temptask = await GetDataTask(targetBuffer, range, QueryConf.ExtractQuery);
                    concurrency.Release();
                    return temptask;
                }, cancelToken);

                tasks.Add(task);
                taskCount++;
            }
    }

Спасибо!

...