Parallel.ForEach ведет себя как обычное для каждого к концу итерации - PullRequest
3 голосов
/ 02 февраля 2011

У меня возникает эта проблема, когда я запускаю что-то вроде этого:

Parallel.ForEach(dataTable.AsEnumerable(), row =>
{
   //do processing
}

Предполагая, что существует более 500 записей, скажем, 870. После того, как Parallel.ForEach завершит выполнение 850, кажется, что он работает последовательно, то есть только 1 операция за раз. Он выполнил 850 операций очень быстро, но когда он приближается к концу итерации, он становится очень медленным и, кажется, работает как обычный для каждой. Я даже попробовал на 2000 записей.

Что-то не так в моем коде? Пожалуйста, дайте предложения.

Ниже приведен код, который я использую

Извините, я только что опубликовал неправильный пример. Это правильный код:

Task newTask = Task.Factory.StartNew(() =>
{
    Parallel.ForEach(dtResult.AsEnumerable(), dr =>
    {
        string extractQuery = "";
        string downLoadFileFullName = "";
        lock (foreachObject)
        {

            string fileName = extractorConfig.EncodeFileName(dr);
            extractQuery = extractorConfig.GetExtractQuery(dr);
            if (string.IsNullOrEmpty(extractQuery)) throw new Exception("Extract Query not found. Please check the configuration");

            string newDownLoadPath = CommonUtil.GetFormalizedDataPath(sDownLoadPath, uKey.CobDate);
            //create folder if it doesn't exist
            if (!Directory.Exists(newDownLoadPath)) Directory.CreateDirectory(newDownLoadPath);
            downLoadFileFullName = Path.Combine(newDownLoadPath, fileName);
        }
        Interlocked.Increment(ref index);

        ExtractorClass util = new ExtractorClass(SourceDbConnStr);
        util.LoadToFile(extractQuery, downLoadFileFullName);
        Interlocked.Increment(ref uiTimerIndex);
    });
});

Ответы [ 3 ]

3 голосов
/ 02 февраля 2011

Мое предположение:

Это похоже на высокую степень потенциального ввода-вывода от:

  • База данных + диск
  • Сетевое взаимодействие с БД и обратно
  • Запись результатов на диск

Поэтому много времени будет потрачено на ожидание ввода-вывода. Я предполагаю, что ожидание только ухудшается, поскольку в микс добавляется все больше потоков, а IO еще больше усиливается. Например, диск имеет только один набор головок, поэтому вы не можете записывать на него одновременно. Если у вас большое количество потоков, пытающихся писать одновременно, производительность ухудшается.

Попробуйте ограничить максимальное количество потоков, которые вы используете:

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };

Parallel.ForEach(dtResult.AsEnumerable(), options, dr =>
{
    //Do stuff
});

Обновление

После редактирования вашего кода я бы предложил следующее, в котором есть несколько изменений:

  • Уменьшите максимальное количество потоков - с этим можно поэкспериментировать.
  • Выполнять проверку и создание каталога только один раз.

Код:

private static bool isDirectoryCreated;

//...

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };

Parallel.ForEach(dtResult.AsEnumerable(), options, dr =>
{
    string fileName, extractQuery, newDownLoadPath;

    lock (foreachObject)
    {
        fileName = extractorConfig.EncodeFileName(dr);

        extractQuery = extractorConfig.GetExtractQuery(dr);

        if (string.IsNullOrEmpty(extractQuery))
            throw new Exception("Extract Query not found. Please check the configuration");

        newDownLoadPath = CommonUtil.GetFormalizedDataPath(sDownLoadPath, uKey.CobDate);

        if (!isDirectoryCreated)
        {
            if (!Directory.Exists(newDownLoadPath))
                Directory.CreateDirectory(newDownLoadPath);

            isDirectoryCreated = true;
        }
    }

    string downLoadFileFullName = Path.Combine(newDownLoadPath, fileName);

    Interlocked.Increment(ref index);

    ExtractorClass util = new ExtractorClass(SourceDbConnStr);
    util.LoadToFile(extractQuery, downLoadFileFullName);

    Interlocked.Increment(ref uiTimerIndex);
});
2 голосов
/ 02 февраля 2011

Трудно дать подробности без соответствующего кода, но в целом это ожидаемое поведение. .NET пытается планировать задачи так, чтобы каждый процессор был равномерно занят.

Но это может быть только приблизительное значение, если не все задачи занимают одинаковое количество времени. В конце некоторые процессоры будут работать, а некоторые - нет, и перераспределение работы будет дорогостоящим и не всегда выгодным.

Я не знаю подробностей о балансировке нагрузки, используемой PLinq, но суть в том, что такое поведение никогда нельзя полностью предотвратить.

1 голос
/ 03 февраля 2011

Предположим, что вы ограничиваете параллельность двумя потоками.Есть (как минимум) два возможных способа, которыми Parallel.ForEach может потенциально работать.Одним из способов может быть запуск двух потоков, и каждому дается половина элементов для завершения.Таким образом, если у вас есть 850 предметов, то в действительности получится, что поток 1 получает первые 425 элементов, а поток 2 получает второй блок из 425 элементов.Теперь оба потока идут на работу.Порядок обработки предметов будет примерно таким: [0, 425, 426, 1, 2, 427, 3, 428, 429, 4, ...].

Это вполне возможно (вероятно, на самом деле) что один из потоков выполнит свою группу элементов намного быстрее, чем другой.

Еще один способ, которым он может работать, - запустить два потока, каждый из которых должен получить элемент из списка, обработать его изатем получите следующий элемент, повторяя до тех пор, пока не останется элементов для обработки.В этом случае порядок обработки элементов будет больше похож на [0, 1, 2, 4, 3, 6, 5, ...].

В первом примере каждому потоку дается блокэлементов для обработки.Во втором случае каждый поток обрабатывает элементы из общего блока до тех пор, пока не останется элементов.

Существуют варианты, но это два основных способа разделения работы между несколькими потоками.Либо дайте каждому свою собственную группу элементов, либо ожидайте, что каждый поток запросит следующий элемент после того, как он закончит обработку.

Parallel.ForEach реализован первым способом: каждый поток получает свою собственную группуэлементы для обработки.Выполнение этого другим способом потребовало бы дополнительных затрат, поскольку список элементов должен обрабатываться как общая очередь, что приводит к дополнительным издержкам синхронизации.

...