Что делает MaxDegreeOfParallelism? - PullRequest
47 голосов
/ 02 марта 2012

Я использую Parallel.ForEach и делаю некоторые обновления базы данных, теперь без установки MaxDegreeOfParallelism, двухъядерный процессор приводит к тайм-аутам клиента sql, тогда как в противном случае четырехъядерный процессор как-то не срабатывает.

Теперь я не могу контролировать, какие типы процессорных ядер доступны при запуске моего кода, но есть ли некоторые параметры, которые я могу изменить с помощью MaxDegreeOfParallelism, который, вероятно, будет выполнять меньше операций одновременно и не приведет к тайм-аутам?

Я могу увеличитьтайм-ауты, но это не очень хорошее решение, если на более низком процессоре я могу обрабатывать меньше операций одновременно, это приведет к меньшей нагрузке на процессор.

Хорошо. Я прочитал все остальные посты и MSDN тоже, но установив MaxDegreeOfParallelism для снижениязначение, от которого страдают мои четырехъядерные машины?

Например, есть ли в любом случае что-то подобное: если у процессора два ядра, тогда используйте 20, если у процессора четыре ядра, то 40?

Ответы [ 5 ]

66 голосов
/ 02 марта 2012

Ответ заключается в том, что это верхний предел для всей параллельной работы, независимо от количества ядер.

Таким образом, даже если вы не используете процессор из-за ожидания ввода-вывода или блокировки, никакие дополнительные задачи не будут выполняться параллельно, только максимум, который вы укажете.

Чтобы выяснить это, я написал этот фрагмент тестового кода. Там есть искусственный замок, чтобы стимулировать TPL использовать больше потоков. То же самое произойдет, когда ваш код ожидает ввода-вывода или базы данных.

class Program
{
    static void Main(string[] args)
    {
        var locker = new Object();
        int count = 0;
        Parallel.For
            (0
             , 1000
             , new ParallelOptions { MaxDegreeOfParallelism = 2 }
             , (i) =>
                   {
                       Interlocked.Increment(ref count);
                       lock (locker)
                       {
                           Console.WriteLine("Number of active threads:" + count);
                           Thread.Sleep(10);
                        }
                        Interlocked.Decrement(ref count);
                    }
            );
    }
}

Если я не укажу MaxDegreeOfParallelism, ведение журнала консоли показывает, что одновременно выполняется до 8 задач. Как это:

Number of active threads:6
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:6
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7

Начинается ниже, увеличивается со временем и в конце пытается одновременно запустить 8.

Если я ограничу его каким-либо произвольным значением (скажем, 2), я получу

Number of active threads:2
Number of active threads:1
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2

О, и это на четырехъядерной машине.

15 голосов
/ 26 июля 2013

Например, есть ли в любом случае что-то вроде того, если у процессора два ядра, тогда используйте 20, если у процессора четыре ядра, то 40?

Это можно сделать, чтобы параллелизм зависел от количества ядер ЦП:

var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 10 };
Parallel.ForEach(sourceCollection, options, sourceItem =>
{
    // do something
});

Однако, новые процессоры, как правило, используют гиперпоточность для имитации дополнительных ядер. Так что если у вас четырехъядерный процессор, то Environment.ProcessorCount, вероятно, сообщит об этом как 8 ядер. Я обнаружил, что если вы установите параллелизм для учета симулированных ядер, то он на самом деле замедляет другие потоки, такие как потоки пользовательского интерфейса.

Таким образом, хотя операция завершится немного быстрее, пользовательский интерфейс приложения может испытывать значительные задержки в течение этого времени. Деление «Environment.ProcessorCount» на 2, по-видимому, обеспечивает одинаковую скорость обработки, но при этом процессор остается доступным для потоков пользовательского интерфейса.

1 голос
/ 09 февраля 2019

Что еще нужно учитывать, особенно для тех, кто обнаружил это много лет спустя, в зависимости от вашей ситуации, как правило, лучше всего собрать все данные в DataTable, а затем использовать SqlBulkCopy в конце каждой важной задачи.

Например, у меня есть процесс, который я выполняю, который проходит через миллионы файлов, и я сталкивался с такими же ошибками, когда каждая файловая транзакция выполняла запрос БД для вставки записи.Вместо этого я перешел к сохранению всего этого в DataTable в памяти для каждого общего ресурса, через который я перебрал, выгружая DataTable в мой SQL Server и очищая его между каждым отдельным ресурсом.Массовая вставка занимает доли секунды и позволяет не открывать тысячи соединений одновременно.

РЕДАКТИРОВАТЬ: Вот быстрый и грязный рабочий пример Метод SQLBulkCopy:

private static void updateDatabase(DataTable targetTable)
    {
        try
        {
            DataSet ds = new DataSet("FileFolderAttribute");
            ds.Tables.Add(targetTable);
            writeToLog(targetTable.TableName + " - Rows: " + targetTable.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            writeToLog(@"Opening SQL connection", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Opening SQL connection");
            SqlConnection sqlConnection = new SqlConnection(sqlConnectionString);
            sqlConnection.Open();
            SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConnection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null);
            bulkCopy.DestinationTableName = "FileFolderAttribute";
            writeToLog(@"Copying data to SQL Server table", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Copying data to SQL Server table");
            foreach (var table in ds.Tables)
            {
                writeToLog(table.ToString(), logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                Console.WriteLine(table.ToString());
            }
            bulkCopy.WriteToServer(ds.Tables[0]);

            sqlConnection.Close();
            sqlConnection.Dispose();
            writeToLog(@"Closing SQL connection", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            writeToLog(@"Clearing local DataTable...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Closing SQL connection");
            Console.WriteLine(@"Clearing local DataTable...");
            targetTable.Clear();
            ds.Tables.Remove(targetTable);
            ds.Clear();
            ds.Dispose();
        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logDatabaseFile);
        }
    }

...и для вывода его в таблицу данных:

private static void writeToDataTable(string ServerHostname, string RootDirectory, string RecordType, string Path, string PathDirectory, string PathFileName, string PathFileExtension, decimal SizeBytes, decimal SizeMB, DateTime DateCreated, DateTime DateModified, DateTime DateLastAccessed, string Owner, int PathLength, DateTime RecordWriteDateTime)
    {
        try
        {
            if (tableToggle)
            {
                DataRow toInsert = results_1.NewRow();
                toInsert[0] = ServerHostname;
                toInsert[1] = RootDirectory;
                toInsert[2] = RecordType;
                toInsert[3] = Path;
                toInsert[4] = PathDirectory;
                toInsert[5] = PathFileName;
                toInsert[6] = PathFileExtension;
                toInsert[7] = SizeBytes;
                toInsert[8] = SizeMB;
                toInsert[9] = DateCreated;
                toInsert[10] = DateModified;
                toInsert[11] = DateLastAccessed;
                toInsert[12] = Owner;
                toInsert[13] = PathLength;
                toInsert[14] = RecordWriteDateTime;

                results_1.Rows.Add(toInsert);
            }
            else
            {
                DataRow toInsert = results_2.NewRow();
                toInsert[0] = ServerHostname;
                toInsert[1] = RootDirectory;
                toInsert[2] = RecordType;
                toInsert[3] = Path;
                toInsert[4] = PathDirectory;
                toInsert[5] = PathFileName;
                toInsert[6] = PathFileExtension;
                toInsert[7] = SizeBytes;
                toInsert[8] = SizeMB;
                toInsert[9] = DateCreated;
                toInsert[10] = DateModified;
                toInsert[11] = DateLastAccessed;
                toInsert[12] = Owner;
                toInsert[13] = PathLength;
                toInsert[14] = RecordWriteDateTime;

                results_2.Rows.Add(toInsert);
            }


        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logFile);
        }
    }

... и вот контекст, сам цикл:

private static void processTargetDirectory(DirectoryInfo rootDirectory, string targetPathRoot)
    {
        DateTime StartTime = DateTime.Now;
        int directoryCount = 0;
        int fileCount = 0;
        try
        {                
            manageDataTables();

            Console.WriteLine(rootDirectory.FullName);
            writeToLog(@"Working in Directory: " + rootDirectory.FullName, logFile, getLineNumber(), getCurrentMethod(), true);

            applicationsDirectoryCount++;

            // REPORT DIRECTORY INFO //
            string directoryOwner = "";
            try
            {
                directoryOwner = File.GetAccessControl(rootDirectory.FullName).GetOwner(typeof(System.Security.Principal.NTAccount)).ToString();
            }
            catch (Exception error)
            {
                //writeToLog("\t" + rootDirectory.FullName, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
                writeToLog("[" + error.Message + "] - " + rootDirectory.FullName, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
                errorLogging(error, getCurrentMethod(), logFile);
                directoryOwner = "SeparatedUser";
            }

            writeToRawLog(serverHostname + "," + targetPathRoot + "," + "Directory" + "," + rootDirectory.Name + "," + rootDirectory.Extension + "," + 0 + "," + 0 + "," + rootDirectory.CreationTime + "," + rootDirectory.LastWriteTime + "," + rootDirectory.LastAccessTime + "," + directoryOwner + "," + rootDirectory.FullName.Length + "," + DateTime.Now + "," + rootDirectory.FullName + "," + "", logResultsFile, true, logFile);
            //writeToDBLog(serverHostname, targetPathRoot, "Directory", rootDirectory.FullName, "", rootDirectory.Name, rootDirectory.Extension, 0, 0, rootDirectory.CreationTime, rootDirectory.LastWriteTime, rootDirectory.LastAccessTime, directoryOwner, rootDirectory.FullName.Length, DateTime.Now);
            writeToDataTable(serverHostname, targetPathRoot, "Directory", rootDirectory.FullName, "", rootDirectory.Name, rootDirectory.Extension, 0, 0, rootDirectory.CreationTime, rootDirectory.LastWriteTime, rootDirectory.LastAccessTime, directoryOwner, rootDirectory.FullName.Length, DateTime.Now);

            if (rootDirectory.GetDirectories().Length > 0)
            {
                Parallel.ForEach(rootDirectory.GetDirectories(), new ParallelOptions { MaxDegreeOfParallelism = directoryDegreeOfParallelism }, dir =>
                {
                    directoryCount++;
                    Interlocked.Increment(ref threadCount);
                    processTargetDirectory(dir, targetPathRoot);
                });

            }

            // REPORT FILE INFO //
            Parallel.ForEach(rootDirectory.GetFiles(), new ParallelOptions { MaxDegreeOfParallelism = fileDegreeOfParallelism }, file =>
            {
                applicationsFileCount++;
                fileCount++;
                Interlocked.Increment(ref threadCount);
                processTargetFile(file, targetPathRoot);
            });

        }
        catch (Exception error)
        {
            writeToLog(error.Message, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
            errorLogging(error, getCurrentMethod(), logFile);
        }
        finally
        {
            Interlocked.Decrement(ref threadCount);
        }

        DateTime EndTime = DateTime.Now;
        writeToLog(@"Run time for " + rootDirectory.FullName + @" is: " + (EndTime - StartTime).ToString() + @" | File Count: " + fileCount + @", Directory Count: " + directoryCount, logTimingFile, getLineNumber(), getCurrentMethod(), true);
    }

Как отмечено выше, это быстро и грязно,но работает очень хорошо.

Из-за проблем с памятью, с которыми я столкнулся, когда получил около 2 000 000 записей, мне пришлось создать второй объект DataTable и чередовать их между двумя, дампируя записи на сервер SQL между чередованием.Так что мои SQL-соединения состоят из 1 на каждые 100 000 записей.

Мне удалось это так:

private static void manageDataTables()
    {
        try
        {
            Console.WriteLine(@"[Checking datatable size] toggleValue: " + tableToggle + " | " + @"r1: " + results_1.Rows.Count + " - " + @"r2: " + results_2.Rows.Count);
            if (tableToggle)
            {
                int rowCount = 0;
                if (results_1.Rows.Count > datatableRecordCountThreshhold)
                {
                    tableToggle ^= true;
                    writeToLog(@"results_1 row count > 100000 @ " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    rowCount = results_1.Rows.Count;
                    logResultsFile = "FileServerReport_Results_" + DateTime.Now.ToString("yyyyMMdd-HHmmss") + ".txt";
                    Thread.Sleep(5000);
                    if (results_1.Rows.Count != rowCount)
                    {
                        writeToLog(@"results_1 row count increased, @ " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                        rowCount = results_1.Rows.Count;
                        Thread.Sleep(15000);
                    }
                    writeToLog(@"results_1 row count stopped increasing, updating database...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    updateDatabase(results_1);
                    results_1.Clear();
                    writeToLog(@"results_1 cleared, count: " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                }

            }
            else
            {
                int rowCount = 0;
                if (results_2.Rows.Count > datatableRecordCountThreshhold)
                {
                    tableToggle ^= true;
                    writeToLog(@"results_2 row count > 100000 @ " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    rowCount = results_2.Rows.Count;
                    logResultsFile = "FileServerReport_Results_" + DateTime.Now.ToString("yyyyMMdd-HHmmss") + ".txt";
                    Thread.Sleep(5000);
                    if (results_2.Rows.Count != rowCount)
                    {
                        writeToLog(@"results_2 row count increased, @ " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                        rowCount = results_2.Rows.Count;
                        Thread.Sleep(15000);
                    }
                    writeToLog(@"results_2 row count stopped increasing, updating database...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    updateDatabase(results_2);
                    results_2.Clear();
                    writeToLog(@"results_2 cleared, count: " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                }
            }
        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logDatabaseFile);
        }
    }

Где "datatableRecordCountThreshhold = 100000"

1 голос
/ 02 марта 2012

Похоже, что код, который вы выполняете параллельно, является взаимоблокировкой, что означает, что, если вы не можете найти и устранить проблему, которая его вызывает, вы не должны распараллеливать ее вообще.

0 голосов
/ 02 марта 2012

устанавливает количество потоков для параллельной работы ...

...