Реализация многопоточности в C # (обзор кода) - PullRequest
6 голосов
/ 28 сентября 2008

Привет.

Я пытаюсь реализовать многопоточный код в приложении. Целью этого кода является проверка элементов, которые ему предоставляет база данных. Проверка может занять довольно много времени (от нескольких сотен мс до нескольких секунд), поэтому для каждого элемента этот процесс необходимо разбить на отдельные потоки.

База данных вначале может давать ей 20 или 30 элементов в секунду, но это начинает быстро уменьшаться, в конечном итоге достигая около 65K элементов за 24 часа, после чего приложение закрывается.

Мне бы хотелось, чтобы кто-нибудь более знающий мог взглянуть на мой код и посмотреть, есть ли очевидные проблемы. Никто из тех, с кем я работаю, не знает многопоточности, так что я действительно один на этот раз.

Вот код. Это довольно долго, но должно быть довольно ясно. Дайте мне знать, если у вас есть какие-либо отзывы или советы. Спасибо!

public class ItemValidationService
{
    /// <summary>
    /// The object to lock on in this class, for multithreading purposes.
    /// </summary>
    private static object locker = new object();

    /// <summary>Items that have been validated.</summary>
    private HashSet<int> validatedItems;

    /// <summary>Items that are currently being validated.</summary>
    private HashSet<int> validatingItems;

    /// <summary>Remove an item from the index if its links are bad.</summary>
    /// <param name="id">The ID of the item.</param>
    public void ValidateItem(int id)
    {
        lock (locker)
        {
            if
            (
                !this.validatedItems.Contains(id) &&
                !this.validatingItems.Contains(id)
            ){
                ThreadPool.QueueUserWorkItem(sender =>
                {
                    this.Validate(id);
                });
            }
        }

    } // method

    private void Validate(int itemId)
    {
        lock (locker)
        {
            this.validatingItems.Add(itemId);
        }

        // *********************************************
        // Time-consuming routine to validate an item...
        // *********************************************

        lock (locker)
        {
            this.validatingItems.Remove(itemId);
            this.validatedItems.Add(itemId);
        }

    } // method

} // class

Ответы [ 7 ]

4 голосов
/ 28 сентября 2008

Пул потоков является удобным выбором, если у вас есть легкий вес спорадическая обработка, не зависящая от времени. Однако я вспоминаю, что читал в MSDN, что он не подходит для крупномасштабной обработки такого рода.

Я использовал это для чего-то очень похожего на это и сожалею об этом. В последующих приложениях я придерживался подхода «рабочий поток», и мне намного приятнее тот уровень контроля, который у меня есть.

Мой любимый шаблон в модели с рабочим потоком - это создание главного потока, который содержит очередь элементов задач. Затем выделите кучу рабочих, которые вытаскивают элементы из этой очереди для обработки. Я использую блокирующую очередь, чтобы при отсутствии элементов рабочий процесс блокировался до тех пор, пока что-то не было помещено в очередь. В этой модели главный поток создает рабочие элементы из некоторого источника (дБ и т. Д.), А рабочие потоки их потребляют.

2 голосов
/ 28 сентября 2008

Я придерживаюсь идеи использования очереди блокировки и рабочих потоков. Вот реализация блокировки очереди, которую я использовал в прошлом с хорошими результатами: http://www.codeproject.com/KB/recipes/boundedblockingqueue.aspx

Что входит в вашу логику проверки? Если он в основном связан с процессором, я бы создал не более 1 рабочего потока на процессор / ядро ​​на коробке. Это скажет вам количество процессоров: Environment.ProcessorCount

Если ваша проверка включает ввод-вывод, такой как доступ к файлам или доступ к базе данных, тогда вы можете использовать несколько потоков больше, чем число процессоров.

1 голос
/ 29 сентября 2008

ThreadPool не может быть оптимальным для одновременного заклинивания в него. Вы можете исследовать верхние пределы своих возможностей и / или свернуть свои собственные.

Кроме того, в вашем коде существует условие гонки, если вы не ожидаете дублирования проверок. Звонок на

this.validatingItems.Add(itemId);

должно происходить в основном потоке (ValidateItem), а не в потоке пула потоков (метод Validate). Этот вызов должен произойти на линии до постановки в очередь рабочего элемента в пул.

Обнаружена худшая ошибка, если не проверить возвращение QueueUserWorkItem. Очередь может потерпеть неудачу, и почему она не вызывает исключений, является загадкой для всех нас. Если он возвращает false, вам нужно удалить элемент, который был добавлен в список validatingItems, и обработать ошибку (вероятно, throw исключение).

1 голос
/ 28 сентября 2008

Возможна логическая ошибка в коде, размещенном с вопросом, в зависимости от того, откуда берется идентификатор элемента в ValidateItem(int id). Зачем? Потому что, хотя вы правильно блокируете свои очереди validatingItems и validatedItems перед тем, как ставить рабочий элемент в очередь, вы не добавляете элемент в очередь validatingItems, пока новый поток не раскрутится. Это означает, что может быть промежуток времени, когда другой поток вызывает ValidateItem(id) с тем же идентификатором (если только он не выполняется в одном основном потоке).

Я бы добавил элемент в очередь validatingItems непосредственно перед его помещением в очередь.

Редактировать: также QueueUserWorkItem() возвращает логическое значение, поэтому вы должны использовать возвращаемое значение, чтобы убедиться, что элемент был поставлен в очередь, а затем добавьте его в очередь validatingItems.

1 голос
/ 28 сентября 2008

Будьте осторожны, QueueUserWorkItem может завершиться ошибкой

0 голосов
/ 29 сентября 2008

Вы также можете попробовать использовать CCR - Concurrency and Coordination Runtime. Он похоронен в Microsoft Robotics Studio, но предоставляет отличный API для подобных вещей.

Вам просто нужно создать «Порт» (по существу, очередь), подключить получателя (метод, который вызывается, когда ему что-то отправляют), а затем публиковать на нем рабочие элементы. CCR обрабатывает очередь и рабочий поток для ее запуска.

Вот видео на 9 канале о CCR.

Он очень высокопроизводительный и даже используется не для робототехники (Myspace.com использует его за кулисами для своей сети доставки контента).

0 голосов
/ 28 сентября 2008

Я бы беспокоился о производительности здесь. Вы указали, что база данных может выдавать 20–30 элементов в секунду, и проверка элемента может занять до нескольких секунд. Это может быть довольно большое количество потоков - используя ваши метрики, в худшем случае 60-90 потоков! Я думаю, вам нужно пересмотреть дизайн здесь. Майкл упомянул хороший образец. Использование очереди действительно помогает держать вещи под контролем и организованно. Семафор также может быть использован для управления количеством созданных потоков - то есть вы можете иметь максимально допустимое количество потоков, но при меньших нагрузках вам не обязательно создавать максимальное число, если меньшее количество получит работу - - т. е. ваш собственный размер пула может быть динамическим с ограничением.

При использовании пула потоков мне также сложнее отслеживать выполнение потоков из пула при выполнении ими работы. Так что, если это не огонь и забыть, я за более контролируемое исполнение. Я знаю, что вы упомянули, что ваше приложение завершает работу после того, как все элементы в 65 КБ завершены. Как вы отслеживаете свои потоки, чтобы определить, закончили ли они свою работу - то есть все работники, находящиеся в очереди, сделаны. Вы контролируете состояние всех элементов в HashSets? Я думаю, поставив в очередь свои элементы и заставив свои рабочие потоки поглощать эту очередь, вы можете получить больше контроля. Хотя это может происходить за счет дополнительных издержек с точки зрения сигнализации между потоками, чтобы указать, когда все элементы были поставлены в очередь, позволяя им выйти.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...