Почему BlockingCollection <T>замедляется при добавлении к нему дополнительных потоков? - PullRequest
0 голосов
/ 28 мая 2020

Я занимаюсь профилированием коллекции BlockingCollection с целью использования ее в сценарии обработки данных UDP. В настоящее время он используется только для одного потребителя: входящие данные UDP записываются в коллекцию, а затем обрабатываются соответствующим образом.

Я думал, что сделаю профилирование нескольких потребителей / задач для повышения производительности, но я вижу некоторые странные результаты, поэтому мне что-то не хватает.

К вашему сведению, токен отмены не используется для профилирования.

Суть в том, чтобы поставить в очередь 1000000 номеров, а затем вытащить их из разных потоков, которые я хотя бы увеличила производительность, но имеет обратный эффект.

Вот основная c настройка для тестирования / профилирования (консольное приложение)

    static BlockingCollection<int> Queue = new BlockingCollection<int>();

     static void Main(string[] args) {

        m_tokenProcessData = new CancellationTokenSource();

        m_cancellationToken = m_tokenProcessData.Token;

        PrepareQueue();

        StartTasks(1);

        Console.ReadKey();

    }

    static void PrepareQueue() {

        for (int i = 0; i <= 1000000; i++) {

            Queue.Add(i);

        }

    }

     static void StartTasks( int maxTasks ) {

        m_startTime = DateTime.Now;

        for(int i=0; i<=maxTasks; i++ ) {

            Task.Factory.StartNew(() => ProcessData(), m_cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);

        }

    }

    static void ProcessData( ) {

        foreach( var number in Queue.GetConsumingEnumerable() ) {

            Task.Delay(10);

            var test = Queue.Count;

            if (test == 0) Finish();

        }

    }

     static void Finish() {

        var endTime = DateTime.Now;

        var timeTaken = (endTime - m_startTime).TotalMilliseconds;

        Console.WriteLine($"Processing Took {timeTaken}ms");

    }

Task.Delay (10) только там, чтобы смоделировать выполняемую работу.

Результаты тестирования

1 Task = 3217ms
2 Tasks = 3178ms
4 Tasks = 3365ms
8 Tasks = 3986ms
16 Tasks = 4380ms
32 Tasks = 3954ms
64 Tasks = 4854ms

Может ли кто-нибудь помочь в том, что я могу упустить / не понять?

Спасибо,

Даниил.

1 Ответ

2 голосов
/ 28 мая 2020

BlockingCollection<T> является потокобезопасным компонентом и содержит примитив синхронизации (объект SemaphoreSlim). Я предполагаю, что чем больше потоков вы кидаете на него, тем больше времени семафор должен ждать. См. Строка 431 в исходном коде BlockingCollection.

Добавление большего количества потоков к проблеме также увеличивает накладные расходы. Часто бывает, что оптимальным решением будет не больше потоков. В зависимости от сценария производителя / потребителя оптимальное количество потоков часто оказывается количеством ядер, которые у вас есть на вашем компьютере, , потому что помимо этого потоки просто выполняют переключение задач.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...