Улучшение скорости обработки с помощью задач - PullRequest
2 голосов
/ 27 октября 2019

У меня есть следующий код:

class Program
{
    class ProcessedEven
    {
        public int ProcessedInt { get; set; }

        public DateTime ProcessedValue { get; set; }
    }

    class ProcessedOdd
    {
        public int ProcessedInt { get; set; }

        public string ProcessedValue { get; set; }
    }

    static void Main(string[] args)
    {
        Stopwatch stopwatch = new Stopwatch();

        IEnumerator<int> enumerator = Enumerable.Range(0, 100000).GetEnumerator();
        Dictionary<int, ProcessedOdd> processedOddValuesDictionary = new Dictionary<int, ProcessedOdd>();
        Dictionary<int, ProcessedEven> processedEvenValuesDictionary = new Dictionary<int, ProcessedEven>();

        stopwatch.Start();

        while (enumerator.MoveNext())
        {
            int currentNumber = enumerator.Current;

            if (currentNumber % 2 == 0)
            {
                Task.Run(() =>
                {
                    ProcessedEven processedEven =
                        new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
                    await Task.Delay(100);

                    processedEvenValuesDictionary.Add(currentNumber, processedEven);
                });
            }
            else
            {
                Task.Run(() =>
                {
                    ProcessedOdd processedOdd =
                        new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
                    await Task.Delay(100);

                    processedOddValuesDictionary.Add(currentNumber, processedOdd);
                });
            }
        }

        stopwatch.Stop();

        Console.WriteLine(stopwatch.Elapsed.TotalSeconds);

        Console.ReadKey();
    }

Так что в основном я должен перебирать перечислитель, который все время синхронизирован.

Как только текущее значение из итератора взято,обрабатывается, каким-то образом, что занимает много времени. После обработки в зависимости от его значения добавляется в словарь. Таким образом, в конце словари должны быть заполнены правильными значениями.

Чтобы повысить скорость, я подумал, что введение параллелизма может помочь, но после добавления «Task.Run» вызывает несколько

"System.NullReferenceException: 'Ссылка на объект не установлена ​​для экземпляра объекта"

возникли исключения. Также увеличилось время выполнения по сравнению с «синхронной» версией этого кода (та, что без вызова «Task.Run»).

Я не понимаю, почему возникают эти исключения, поскольку кажется, что все не такnull.

Есть ли способ повысить скорость в этом сценарии (исходный код без вызовов «Task.Run») с помощью многопоточности?

Следует ли добавлять обработанные элементысделать словари внутри оператора блокировки, так как словари, кажется, разделены между задачами?

Ответы [ 3 ]

3 голосов
/ 27 октября 2019

Вы создаете множество небольших задач и исчерпываете свой пул потоков, вызывая Task.Run. Вам лучше использовать Parallel.ForEach для лучшей производительности. И, как @ user1672994 сказал, что вы должны использовать поточно-ориентированную версию Dictionary - ConcurrentDictionary

static void Main(string[] args)
{
    Stopwatch stopwatch = new Stopwatch();

    IEnumerable<int> enumerable = Enumerable.Range(0, 100000);
    ConcurrentDictionary<int, ProcessedOdd> processedOddValuesDictionary = new ConcurrentDictionary<int, ProcessedOdd>();
    ConcurrentDictionary<int, ProcessedEven> processedEvenValuesDictionary = new ConcurrentDictionary<int, ProcessedEven>();

    stopwatch.Start();

    Parallel.ForEach(enumerable,
        currentNumber =>
            {
                if (currentNumber % 2 == 0)
                {
                    ProcessedEven processedEven =
                        new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
                    // Task.Delay(100);

                    processedEvenValuesDictionary.TryAdd(currentNumber, processedEven);
                }
                else
                {
                    ProcessedOdd processedOdd =
                        new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
                    // Task.Delay(100);

                    processedOddValuesDictionary.TryAdd(currentNumber, processedOdd);
                }
            });

    stopwatch.Stop();

    Console.WriteLine(stopwatch.Elapsed.TotalSeconds);

    Console.ReadKey();
}

Я также не понимаю, зачем вам нужен Task.Delay(100) в вашем коде. В любом случае это асинхронная операция, которая без оператора await сделает то, чего вы, вероятно, не ожидаете. Использование эфира в ожидании или использование версии синхронизации Thread.Sleep(100)

2 голосов
/ 28 октября 2019

Конкретная причина получения NullReferenceException заключается в том, что внутреннее состояние контейнера Dictionary повреждено. Вероятно, два потока пытались изменить размер двух внутренних массивов Dictionary параллельно, или что-то еще одинаково неприятное. На самом деле вам повезло, что вы получили эти исключения, потому что гораздо худшим результатом будет наличие работающей программы, которая дает неверные результаты.

Более общая причина этой проблемы заключается в том, что вы разрешили параллельный асинхронизированный доступ к потоку. небезопасные объекты. Класс Dictionary, как и большинство встроенных классов .NET, не является поточно-ориентированным. Это реализовано в предположении, что к нему будет обращаться один поток (или хотя бы один поток за раз). Он не содержит внутренней синхронизации. Причина заключается в том, что добавление синхронизации в класс влечет за собой сложность API и снижение производительности, и нет никаких причин платить эти издержки каждый раз, когда вы используете этот класс, когда это потребуется только в нескольких особых случаях.

Есть много решений вашей проблемы. Один из них - продолжать использовать небезопасный поток Dictionary, но обеспечить доступ к нему исключительно с помощью блокировок. Это наиболее гибкое решение, но вы должны быть очень осторожны, чтобы не допустить ни одного незащищенного пути кода к объекту. Доступ к каждому свойству и каждому методу с чтением или записи в него должен быть внутри lock. Так что это гибко, но хрупко, и может стать узким местом производительности в случае сильной конкуренции (т. Е. Слишком много потоков одновременно запрашивают эксклюзивную блокировку и вынуждены ждать в очереди).

Другое решение состоит в том, чтобыиспользуйте потокобезопасный контейнер, такой как ConcurrentDictionary. Этот класс гарантирует, что его внутреннее состояние никогда не будет повреждено при параллельном доступе нескольких потоков. К сожалению, это не гарантирует ничего из остального состояния вашей программы. Так что это подходит для некоторых простых случаев, когда у вас нет другого общего состояния, кроме как из самого словаря. В этих случаях он предлагает повышение производительности, поскольку он реализован с гранулярной внутренней блокировкой (имеется несколько блокировок, по одной на каждый сегмент данных).

Лучшее решение - полностью устранить необходимость синхронизации потоков, устраняя необходимостьобщее состояние. Просто дайте каждому потоку работать со своим внутренним изолированным подмножеством или данными, и объединяйте эти подмножества только после завершения всех потоков. Обычно это обеспечивает наилучшую производительность за счет необходимости разбить начальную рабочую нагрузку и затем написать окончательный код слияния. Есть библиотеки, которые следуют этой стратегии, но имеют дело со всем этим образцом, позволяя вам писать как можно меньше кода. Одна из лучших - это библиотека TPL Dataflow , которая фактически встроена в платформу .NET Core. Для .NET Framework вам необходимо установить пакет, чтобы использовать его.

2 голосов
/ 27 октября 2019

Вы должны использовать ConcurrentDictionary, который является потокобезопасным набором пар ключ / значение, к которым могут обращаться несколько потоков одновременно.

ConcurrentDictionary предназначен для многопоточных сценариев. Вам не нужно использовать блокировки в своем коде для добавления или удаления элементов из коллекции. Однако один поток всегда может извлечь значение, а другой поток - немедленно обновить коллекцию, присвоив этому ключу новое значение.

Когда я запускаю ваш код после изменения Dictionary на ConcurrentDictionary затем код выполняется без NullReferenceException и завершается через ~ 1,37 секунды.

Полный код:

    class Program
    {
        class ProcessedEven
        {
            public int ProcessedInt { get; set; }

            public DateTime ProcessedValue { get; set; }
        }

        class ProcessedOdd
        {
            public int ProcessedInt { get; set; }

            public string ProcessedValue { get; set; }
        }

        static void Main(string[] args)
        {
            Stopwatch stopwatch = new Stopwatch();

            IEnumerator<int> enumerator = Enumerable.Range(0, 100000).GetEnumerator();
            ConcurrentDictionary<int, ProcessedOdd> processedOddValuesDictionary = new ConcurrentDictionary<int, ProcessedOdd>();
            ConcurrentDictionary<int, ProcessedEven> processedEvenValuesDictionary = new ConcurrentDictionary<int, ProcessedEven>();

            stopwatch.Start();

            while (enumerator.MoveNext())
            {
                int currentNumber = enumerator.Current;

                if (currentNumber % 2 == 0)
                {
                    Task.Run(() =>
                    {
                        ProcessedEven processedEven =
                            new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
                        Task.Delay(100);

                        processedEvenValuesDictionary.TryAdd(currentNumber, processedEven);
                    });
                }
                else
                {
                    Task.Run(() =>
                    {
                        ProcessedOdd processedOdd =
                            new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
                        Task.Delay(100);

                        processedOddValuesDictionary.TryAdd(currentNumber, processedOdd);
                    });
                }
            }

            stopwatch.Stop();

            Console.WriteLine(stopwatch.Elapsed.TotalSeconds);

            Console.ReadKey();
        }
    }

enter image description here

...