Кажется, что у вас есть ряд заданий ввода-вывода и ЦП, которые вам нужно выполнять одно за другим, с разной степенью параллелизма, необходимого для каждого шага. Хорошим инструментом для работы с такого рода нагрузками является библиотека TPL Dataflow . Эта библиотека разработана таким образом, что позволяет формировать конвейеры (или даже сложные сети) данных, которые передаются из одного блока в другой. Я попытался придумать пример, демонстрирующий использование этой библиотеки, а затем понял, что ваш рабочий процесс включает в себя последний шаг, где должно быть обновлено свойство (internalLink.PossibleResults
), которое относится к первому типу элемента, поступающего в конвейер. Это сильно усложняет ситуацию, поскольку подразумевает, что первый тип должен переноситься по всем этапам конвейера. Возможно, самый простой способ сделать это - использовать ValueTuple
s в качестве входных и выходных блоков. Это сделало бы мой пример слишком запутанным, поэтому я предпочел сохранить его в простейшем виде, поскольку его цель в основном состоит в демонстрации возможностей библиотеки потока данных TPL:
var cts = new CancellationTokenSource();
var restClient = new RestClient();
var block1 = new TransformBlock<InternalLinksModel, RestResponse>(async item =>
{
return await restClient.ExecuteTaskAsync(item);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 10, // 10 concurrent REST requests max
CancellationToken = cts.Token, // Cancel at any time
});
var block2 = new TransformBlock<RestResponse, ResponseModel>(item =>
{
return JsonConvert.DeserializeObject<ResponseModel>(item.Content);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 2, // 2 threads max for this CPU bound job
CancellationToken = cts.Token, // Cancel at any time
});
var block3 = new TransformBlock<ResponseModel, string>(async item =>
{
return await SearchValue(item);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 10, // Concurrency 10 for this I/O bound job
CancellationToken = cts.Token, // Cancel at any time
});
var block4 = new ActionBlock<string>(item =>
{
ValidateSearchResult(item);
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1, // 1 thread max for this CPU bound job
CancellationToken = cts.Token, // Cancel at any time
});
block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });
block2.LinkTo(block3, new DataflowLinkOptions() { PropagateCompletion = true });
block3.LinkTo(block4, new DataflowLinkOptions() { PropagateCompletion = true });
var internalLinks = new List<InternalLinksModel>();
// Populate internalLinks with the data
foreach (var internalLink in internalLinks)
{
await block1.SendAsync(internalLink);
}
block1.Complete();
await block4.Completion;
Используются два типа блоковв этом примере TransformBlock
и ActionBlock
. ActionBlock
обычно является последним блоком конвейера, так как он не производит никакого вывода. В случае, если ваша рабочая нагрузка слишком гранулированная, и накладные расходы, связанные с передачей объектов, сравнимы с самой рабочей нагрузкой, вы можете запустить конвейер с BatchBlock
, а затем обработать следующие шаги партиями,скажем, 10 элементов каждый. Похоже, что это не требуется в вашем случае, поскольку создание веб-запросов и анализ ответов JSON - довольно громоздкая работа.