C # многопоточный цикл foreach - PullRequest
0 голосов
/ 11 октября 2019

Я недавно начал работать над многопоточными вызовами с C #, и я не уверен, правильно ли это или нет.

Как я могу сделать это быстрее? Я предполагаю, что это с параллелизмом, но мне не удалось интегрировать эту концепцию в это.

Редактирует

Обратите внимание, что это работает на удаленной виртуальной машине, и это консольная программа;смысл пользовательского опыта не является проблемой. Я просто хочу, чтобы это работало быстро, поскольку количество ссылок может доходить до 200 тыс. Элементов, и мы хотим получить результаты как можно скорее. Я также удалил все вопросы, кроме одного, так как это тот, который мне нужна помощь.

Вот мой код, который, кажется, работает:

// Use of my results
public void Main() 
{
  var results = ValidateInternalLinks();
  // Writes results to txt file
  WriteResults(results.Result, "Internal Links");
}

// Validation of data
public async Task<List<InternalLinksModel>> ValidateInternalLinks() 
{
  var tasks = new List<Task>();
  var InternalLinks = new List<InternalLinksModel>();
  // Populate InternalLinks with the data

  foreach (var internalLink in InternalLinks)
  {
    tasks.Add(GetResults(internalLink));
  }

  await Task.WhenAll(tasks);

  return InternalLinks;
}

// Get Results for each piece of data
public async Task GetResults(InternalLinksModel internalLink)
{ 
  var response = await SearchValue(internalLink.SearchValue);

// Analyse response and change result (possible values: SUCCESS, FAILED, [])
  internalLink.PossibleResults = ValidateSearchResult(response);
}

// Http Request
public async Task<ResponseModel> SearchValue(string value) 
{
  // RestSharp API creation and headers addition
  var response = await client.ExecuteTaskAsync(request);

  return JsonConvert.DeserializeObject<ResponseModel>(response.Content);
}

Ответы [ 2 ]

2 голосов
/ 12 октября 2019

Кажется, что у вас есть ряд заданий ввода-вывода и ЦП, которые вам нужно выполнять одно за другим, с разной степенью параллелизма, необходимого для каждого шага. Хорошим инструментом для работы с такого рода нагрузками является библиотека TPL Dataflow . Эта библиотека разработана таким образом, что позволяет формировать конвейеры (или даже сложные сети) данных, которые передаются из одного блока в другой. Я попытался придумать пример, демонстрирующий использование этой библиотеки, а затем понял, что ваш рабочий процесс включает в себя последний шаг, где должно быть обновлено свойство (internalLink.PossibleResults), которое относится к первому типу элемента, поступающего в конвейер. Это сильно усложняет ситуацию, поскольку подразумевает, что первый тип должен переноситься по всем этапам конвейера. Возможно, самый простой способ сделать это - использовать ValueTuple s в качестве входных и выходных блоков. Это сделало бы мой пример слишком запутанным, поэтому я предпочел сохранить его в простейшем виде, поскольку его цель в основном состоит в демонстрации возможностей библиотеки потока данных TPL:

var cts = new CancellationTokenSource();
var restClient = new RestClient();

var block1 = new TransformBlock<InternalLinksModel, RestResponse>(async item =>
{
    return await restClient.ExecuteTaskAsync(item);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 10, // 10 concurrent REST requests max
    CancellationToken = cts.Token, // Cancel at any time
});

var block2 = new TransformBlock<RestResponse, ResponseModel>(item =>
{
    return JsonConvert.DeserializeObject<ResponseModel>(item.Content);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 2, // 2 threads max for this CPU bound job
    CancellationToken = cts.Token, // Cancel at any time
});

var block3 = new TransformBlock<ResponseModel, string>(async item =>
{
    return await SearchValue(item);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 10, // Concurrency 10 for this I/O bound job
    CancellationToken = cts.Token, // Cancel at any time
});

var block4 = new ActionBlock<string>(item =>
{
    ValidateSearchResult(item);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 1, // 1 thread max for this CPU bound job
    CancellationToken = cts.Token, // Cancel at any time
});

block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });
block2.LinkTo(block3, new DataflowLinkOptions() { PropagateCompletion = true });
block3.LinkTo(block4, new DataflowLinkOptions() { PropagateCompletion = true });

var internalLinks = new List<InternalLinksModel>();
// Populate internalLinks with the data
foreach (var internalLink in internalLinks)
{
    await block1.SendAsync(internalLink);
}
block1.Complete();

await block4.Completion;

Используются два типа блоковв этом примере TransformBlock и ActionBlock. ActionBlock обычно является последним блоком конвейера, так как он не производит никакого вывода. В случае, если ваша рабочая нагрузка слишком гранулированная, и накладные расходы, связанные с передачей объектов, сравнимы с самой рабочей нагрузкой, вы можете запустить конвейер с BatchBlock, а затем обработать следующие шаги партиями,скажем, 10 элементов каждый. Похоже, что это не требуется в вашем случае, поскольку создание веб-запросов и анализ ответов JSON - довольно громоздкая работа.

1 голос
/ 11 октября 2019

async / await / Когда все правильно, узкое место в производительности скорее всего связано с вводом / выводом (HTTP-запросы), а не с вычислением. Асинхронность является подходящим инструментом для решения этой проблемы. Сколько HTTP-запросов вы делаете и все они на один сервер? Если это так, возможно, вы достигли лимита подключения. Я не очень знаком с RestSharp, но вы можете попытаться увеличить лимит соединения через ServicePointManager. Чем больше невыполненных запросов у вас, при условии, что сервер может их обработать, тем быстрее будет выполнен процесс WhenAll.

https://docs.microsoft.com/en-us/dotnet/api/system.net.servicepointmanager?view=netframework-4.8

Все это говорит о том, что я реорганизовал бы ваш код. Используйте Task / WhenAll для ваших HTTP-запросов. И обработайте ответы после завершения WhenAll. Если вы сделаете это, вы можете с уверенностью определить, являются ли узкие места HTTP-запросами, установив точку останова после WhenAll, наблюдая время выполнения. Если вы не можете отладить, вы можете записать время выполнения. Это должно дать вам представление о том, является ли узким местом в первую очередь сетевой ввод-вывод. Я вполне уверен, что это так.

Если выяснится, что существует узкое место в вычислениях, вы можете использовать цикл Parallel.ForEach для десериализации, проверки и назначения.

            var internalLinks = new List<InternalLinksModel>();
            // Populate InternalLinks with the data
            // I'm assuming this means internalLinks is assumed to contain data. If not I'm not sure I understand your code.
            var dictionary = new Dictionary<Task, InternalLinksModel>(); //You shouldn't need a concurrent dictionary since you'll only be doing reads in parallel.

            //make api calls - I/O bound
            foreach (var l in internalLinks)
            {
                dictionary[client.ExecuteTaskAsync(l.SearchValue)] = l;
            }

            await Task.WhenAll(dictionary.Keys);    
            // I/O is done.

            // Compute bound - deserialize, validate, assign.
            Parallel.ForEach(dictionary.Keys, (task) =>
            {
                var responseModel = JsonConvert.DeserializeObject<ResponseModel>(task.Result.Content);
                dictionary[task].PossibleResults = ValidateSearchResult(responseModel);
            });


            // Writes results to txt file
            WriteResults(dictionary.Values, "Internal Links");
...