Parallel.ForEach быстрее, чем Task.WaitAll для задач ввода-вывода? - PullRequest
1 голос
/ 25 сентября 2019

У меня есть две версии моей программы, которые отправляют ~ 3000 HTTP GET-запросов на веб-сервер.

Первая версия основана на том, что я прочитал здесь .Это решение имеет смысл для меня, потому что создание веб-запросов является работой, связанной с вводом-выводом, и использование async / await вместе с Task.WhenAll или Task.WaitAll означает, что вы можете отправить 100 запросов одновременно, а затем ждать их всехзакончите перед отправкой следующих 100 запросов, чтобы не перегружать веб-сервер.Я был удивлен, увидев, что эта версия завершила всю работу за ~ 12 минут - намного медленнее, чем я ожидал.

Вторая версия отправляет все 3000 HTTP-запросов GET внутри Parallel.ForEachпетля.Я использую .Result, чтобы дождаться завершения каждого запроса, прежде чем сможет выполнить остальная часть логики в этой итерации цикла.Я думал, что это будет гораздо менее эффективным решением, так как использование потоков для параллельного выполнения задач обычно лучше подходит для выполнения работы с ограничением ресурсов процессора, но я был удивлен, увидев, что эта версия выполнила всю работу за ~ 3 минуты!

Мой вопрос: почему версия Parallel.ForEach быстрее?Это стало дополнительным сюрпризом, потому что когда я применил те же две техники к другому API / веб-серверу, версия 1 моего кода была фактически быстрее, чем версия 2, примерно на 6 минут -чего я и ожидал.Может ли производительность двух разных версий иметь какое-то отношение к тому, как веб-сервер обрабатывает трафик?

Вы можете увидеть упрощенную версию моего кода ниже:

private async Task<ObjectDetails> TryDeserializeResponse(HttpResponseMessage response)
{
    try
    {
        using (Stream stream = await response.Content.ReadAsStreamAsync())
        using (StreamReader readStream = new StreamReader(stream, Encoding.UTF8))
        using (JsonTextReader jsonTextReader = new JsonTextReader(readStream))
        {
            JsonSerializer serializer = new JsonSerializer();
            ObjectDetails objectDetails = serializer.Deserialize<ObjectDetails>(
                jsonTextReader);
            return objectDetails;
        }
    }
    catch (Exception e)
    {
        // Log exception
        return null;
    }
}

private async Task<HttpResponseMessage> TryGetResponse(string urlStr)
{
    try
    {
        HttpResponseMessage response = await httpClient.GetAsync(urlStr)
            .ConfigureAwait(false);
        if (response.StatusCode != HttpStatusCode.OK)
        {
            throw new WebException("Response code is "
                + response.StatusCode.ToString() + "... not 200 OK.");
        }
        return response;
    }
    catch (Exception e)
    {
        // Log exception
        return null;
    }
}

private async Task<ListOfObjects> GetObjectDetailsAsync(string baseUrl, int id)
{
    string urlStr = baseUrl + @"objects/id/" + id + "/details";

    HttpResponseMessage response = await TryGetResponse(urlStr);

    ObjectDetails objectDetails = await TryDeserializeResponse(response);

    return objectDetails;
}

// With ~3000 objects to retrieve, this code will create 100 API calls
// in parallel, wait for all 100 to finish, and then repeat that process
// ~30 times. In other words, there will be ~30 batches of 100 parallel
// API calls.
private Dictionary<int, Task<ObjectDetails>> GetAllObjectDetailsInBatches(
    string baseUrl, Dictionary<int, MyObject> incompleteObjects)
{
    int batchSize = 100;
    int numberOfBatches = (int)Math.Ceiling(
        (double)incompleteObjects.Count / batchSize);
    Dictionary<int, Task<ObjectDetails>> objectTaskDict
        = new Dictionary<int, Task<ObjectDetails>>(incompleteObjects.Count);

    var orderedIncompleteObjects = incompleteObjects.OrderBy(pair => pair.Key);

    for (int i = 0; i < 1; i++)
    {
        var batchOfObjects = orderedIncompleteObjects.Skip(i * batchSize)
            .Take(batchSize);
        var batchObjectsTaskList = batchOfObjects.Select(
            pair => GetObjectDetailsAsync(baseUrl, pair.Key));
        Task.WaitAll(batchObjectsTaskList.ToArray());
        foreach (var objTask in batchObjectsTaskList)
            objectTaskDict.Add(objTask.Result.id, objTask);
    }

    return objectTaskDict;
}

public void GetObjectsVersion1()
{
    string baseUrl = @"https://mywebserver.com:/api";

    // GetIncompleteObjects is not shown, but it is not relevant to
    // the question
    Dictionary<int, MyObject> incompleteObjects = GetIncompleteObjects();

    Dictionary<int, Task<ObjectDetails>> objectTaskDict
        = GetAllObjectDetailsInBatches(baseUrl, incompleteObjects);

    foreach (KeyValuePair<int, MyObject> pair in incompleteObjects)
    {
        ObjectDetails objectDetails = objectTaskDict[pair.Key].Result
            .objectDetails;

        // Code here that copies fields from objectDetails to pair.Value
        // (the incompleteObject)

        AllObjects.Add(pair.Value);
    };
}

public void GetObjectsVersion2()
{
    string baseUrl = @"https://mywebserver.com:/api";

    // GetIncompleteObjects is not shown, but it is not relevant to
    // the question
    Dictionary<int, MyObject> incompleteObjects = GetIncompleteObjects();

    Parallel.ForEach(incompleteHosts, pair =>
    {
        ObjectDetails objectDetails = GetObjectDetailsAsync(
            baseUrl, pair.Key).Result.objectDetails;

        // Code here that copies fields from objectDetails to pair.Value
        // (the incompleteObject)

        AllObjects.Add(pair.Value);
    });
}

Ответы [ 3 ]

1 голос
/ 25 сентября 2019

Вкратце:

  • Parallel.Foreach() наиболее полезно для задач, связанных с процессором.
  • Task.WaitAll() более полезно для задач, связанных с вводом-выводом.

Итак, в вашем случае вы получаете информацию с веб-серверов, то есть IO.Если асинхронные методы реализованы правильно, он не заблокирует ни одного потока. (он будет использовать порты завершения ввода-вывода для ожидания) Таким образом, потоки могут делать другие вещи.

Запустив асинхронные методы GetObjectDetailsAsync(baseUrl, pair.Key).Result синхронно, он заблокирует поток.Таким образом, поток будет заполнен ожидающими потоками.

Так что я думаю, что решение задачи будет лучше соответствовать.

0 голосов
/ 25 сентября 2019

Возможная причина, по которой Parallel.ForEach может работать быстрее, заключается в том, что это создает побочный эффект регулирования.Первоначально потоки x обрабатывают первые элементы x (где x в количестве доступных ядер), и постепенно может быть добавлено больше потоков в зависимости от внутренней эвристики.Регулирование операций ввода-вывода - это хорошо, потому что оно защищает сеть и сервер, который обрабатывает запросы, от перегрузки.Ваш альтернативный импровизированный метод регулирования путем отправки запросов партиями по 100 далек от идеала по многим причинам, одна из которых заключается в том, что 100 одновременных запросов - это много запросов!Другая причина заключается в том, что одна длительная операция может задержать завершение пакета до тех пор, пока не завершатся остальные 99 операций.

Обратите внимание, что Parallel.ForEach также не идеален для распараллеливания операций ввода-вывода.Просто оказалось, что он работает лучше, чем альтернатива, тратя впустую всю память.Для лучших подходов смотрите здесь: Как ограничить количество одновременных операций асинхронного ввода-вывода?

0 голосов
/ 25 сентября 2019

https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreach?view=netframework-4.8

По сути, forera parralel позволяет выполнять итерации параллельно, поэтому вы не ограничиваете итерацию для запуска в последовательном режиме на хосте, который не ограничен потоком, это приведет к повышению пропускной способности

...