C # шаблон потока, который позволит мне промыть - PullRequest
4 голосов
/ 19 марта 2010

У меня есть класс, который реализует шаблон Begin / End Invocation, где я изначально использовал ThreadPool.QueueUserWorkItem () для потоковой работы. Работа, выполняемая в потоке, не зацикливается, но занимает немного времени для обработки, поэтому сама работа не может быть легко остановлена.

Теперь у меня есть побочный эффект, когда кто-то, использующий мой класс, вызывает Begin (с обратным вызовом) множество раз, чтобы выполнить большую обработку, поэтому ThreadPool.QueueUserWorkItem создает тонну потоков для выполнения обработки. Это само по себе неплохо, но есть случаи, когда они хотят отказаться от обработки и начать новый процесс, но вынуждены ждать завершения своего первого запроса.

Поскольку ThreadPool.QueueUseWorkItem () не позволяет мне отменять потоки, я пытаюсь найти лучший способ поставить работу в очередь и, возможно, использовать явный метод FlushQueue () в моем классе, чтобы позволить вызывающей стороне отказаться от работы в моей очереди.

У кого-нибудь есть предложения по шаблону потоков, которые соответствуют моим потребностям?

Редактировать: В настоящее время я нацеливаюсь на структуру 2.0. В настоящее время я думаю, что очередь Consumer / Producer может работать. У кого-нибудь есть идеи по поводу очистки очереди?

Редактировать 2 Разъяснение проблемы: Так как я использую шаблон Begin / End в своем классе каждый раз, когда вызывающая сторона использует Begin with callback, я создаю новый поток в пуле потоков. Этот вызов выполняет очень небольшую обработку и не там, где я хочу отменить. Это незавершенные задания в очереди, которые я хочу остановить.

Тот факт, что ThreadPool по умолчанию создаст 250 потоков на процессор, означает, что если вы попросите ThreadPool поставить в очередь большое количество элементов с помощью QueueUserWorkItem (), вы в конечном итоге создадите огромное количество параллельных потоков, которые невозможно остановить. .

Из-за способа, которым я поставил в очередь потоки, вызывающая сторона способна довести процессор до 100% не только работой, но и созданием работы.

Я думал, используя шаблон Producer / Consumer, я мог бы поставить эти потоки в очередь в свою собственную очередь, что позволило бы мне модерировать количество создаваемых мной потоков, чтобы избежать скачка ЦП при создании всех параллельных потоков. И что я мог бы позволить вызывающей стороне моего класса сбрасывать все задания в очереди, когда они оставляют запросы.

В настоящее время я пытаюсь реализовать это сам, но подумал, что было бы неплохо, чтобы кто-то сказал, взглянуть на этот код, иначе вы не сможете сбросить из-за этого, или сброс не подходит для этого термина.

Ответы [ 4 ]

1 голос
/ 19 марта 2010

Метод, который я использовал в прошлом, хотя это, конечно, не лучшая практика - выделять экземпляр класса для каждого потока и иметь флаг прерывания в классе. Затем создайте метод ThrowIfAborting для класса, который периодически вызывается из потока (особенно, если поток выполняет цикл, просто вызывайте его при каждой итерации). Если флаг был установлен, ThrowIfAborting просто выдаст исключение, которое перехватывается в основном методе для потока. Просто убедитесь, что вы очищаете свои ресурсы во время прерывания.

1 голос
/ 19 марта 2010

РЕДАКТИРОВАТЬ Мой ответ не применяется, так как OP использует 2.0. Оставив и переключившись на CW для всех, кто читает этот вопрос и использует 4.0

Если вы используете C # 4.0 или можете полагаться на одну из более ранних версий параллельных платформ, вы можете использовать их встроенную поддержку отмены. Это не так просто, как отмена потока, но фреймворк гораздо надежнее (отмена потока очень привлекательна, но и очень опасна).

Рид сделал отличную статью на этот счет, вам стоит взглянуть на

0 голосов
/ 19 марта 2010

Вы можете расширить шаблон Begin / End, чтобы стать шаблоном Begin / Cancel / End. Метод Cancel может установить флаг cancel , который рабочий поток периодически опрашивает. Когда рабочий поток обнаруживает запрос отмены, он может остановить свою работу, при необходимости очистить ресурсы и сообщить, что операция была отменена как часть аргументов End.

0 голосов
/ 19 марта 2010

Я решил, что именно я считаю вашей проблемой, используя класс-оболочку для 1+ экземпляров BackgroundWorker.

К сожалению, я не могу опубликовать весь свой класс, но вот основная концепция и ее ограничения.

Использование : Вы просто создаете экземпляр и вызываете RunOrReplace (...), когда хотите отменить старого работника и начать новый. Если старый работник был занят, его просят отменить, а затем другой работник используется для немедленного выполнения вашего запроса.

public class BackgroundWorkerReplaceable : IDisposable
{

BackgroupWorker activeWorker = null;
object activeWorkerSyncRoot = new object();
List<BackgroupWorker> workerPool = new List<BackgroupWorker>();

DoWorkEventHandler doWork;
RunWorkerCompletedEventHandler runWorkerCompleted;

public bool IsBusy
{
    get { return activeWorker != null ? activeWorker.IsBusy; : false }
}

public BackgroundWorkerReplaceable(DoWorkEventHandler doWork, RunWorkerCompletedEventHandler runWorkerCompleted)
{
    this.doWork = doWork;
    this.runWorkerCompleted = runWorkerCompleted;
    ResetActiveWorker();
}

public void RunOrReplace(Object param, ...) // Overloads could include ProgressChangedEventHandler and other stuff
{
    try
    {
        lock(activeWorkerSyncRoot)
        {
            if(activeWorker.IsBusy)
            {
                ResetActiveWorker();
            }

            // This works because if IsBusy was false above, there is no way for it to become true without another thread obtaining a lock
            if(!activeWorker.IsBusy)
            {
                // Optionally handle ProgressChangedEventHandler and other features (under the lock!)

                // Work on this new param
                activeWorker.RunWorkerAsync(param);
            }
            else
            { // This should never happen since we create new workers when there's none available!
                throw new LogicException(...); // assert or similar
            }
        }
    }
    catch(...) // InvalidOperationException and Exception
    { // In my experience, it's safe to just show the user an error and ignore these, but that's going to depend on what you use this for and where you want the exception handling to be
    }
}

public void Cancel()
{
    ResetActiveWorker();
}

public void Dispose()
{ // You should implement a proper Dispose/Finalizer pattern
    if(activeWorker != null)
    {
        activeWorker.CancelAsync();
    }
    foreach(BackgroundWorker worker in workerPool)
    {
        worker.CancelAsync();
        worker.Dispose();
        // perhaps use a for loop instead so you can set worker to null?  This might help the GC, but it's probably not needed
    }
}

void ResetActiveWorker()
{
    lock(activeWorkerSyncRoot)
    {
        if(activeWorker == null)
        {
            activeWorker = GetAvailableWorker();
        }
        else if(activeWorker.IsBusy)
        { // Current worker is busy - issue a cancel and set another active worker
            activeWorker.CancelAsync(); // Make sure WorkerSupportsCancellation must be set to true [Link9372]
            // Optionally handle ProgressEventHandler -=
            activeWorker = GetAvailableWorker(); // Ensure that the activeWorker is available
        }
        //else - do nothing, activeWorker is already ready for work!
    }
}
BackgroupdWorker GetAvailableWorker()
{
    // Loop through workerPool and return a worker if IsBusy is false
    // if the loop exits without returning...
    if(activeWorker != null)
    {
        workerPool.Add(activeWorker); // Save the old worker for possible future use
    }
    return GenerateNewWorker();
}
BackgroundWorker GenerateNewWorker()
{
    BackgroundWorker worker = new BackgroundWorker();
    worker.WorkerSupportsCancellation = true; // [Link9372]
    //worker.WorkerReportsProgress
    worker.DoWork += doWork;
    worker.RunWorkerCompleted += runWorkerCompleted;
    // Other stuff
    return worker;
}

} // class

Pro / Con

Преимущество этого заключается в очень низкой задержке начала нового выполнения, поскольку новым потокам не нужно ждать завершения старых .

Это происходит за счет теоретического бесконечного роста объектов BackgroundWorker, которые никогда не получают GC'd. Однако на практике приведенный ниже код пытается утилизировать старых рабочих, поэтому обычно не нужно сталкиваться с большим количеством идеальных потоков. Если вас это беспокоит из-за того, как вы планируете использовать этот класс, вы можете реализовать Timer, который запускает метод CleanUpExcessWorkers (...), или же ResetActiveWorker () выполнит эту очистку (за счет более длинного RunOrReplace (. ..) задержка).

Основная цена от использования этого - именно поэтому она выгодна - она ​​не ждет завершения предыдущего потока, например, если DoWork выполняет вызов базы данных и вы запускаете RunOrReplace (...) 10 раз в Быстрая последовательность, вызов базы данных не может быть немедленно отменен, когда поток - так у вас будет 10 запущенных запросов, что сделает все они медленными! Как правило, это нормально работает с Oracle, вызывая лишь незначительные задержки, но у меня нет опыта работы с другими базами данных (чтобы ускорить очистку, у меня отмененный работник говорит Oracle отменить команду). Правильное использование EventArgs, описанного ниже, в основном решает эту проблему.

Другая незначительная стоимость - то, что любой код, выполняемый этим BackgroundWorker, должен быть совместимым с этой концепцией - он должен быть в состоянии безопасно восстанавливаться после отмены. DoWorkEventArgs и RunWorkerCompletedEventArgs имеют свойство Cancel / Cancelled, которое вы должны использовать. Например, если вы выполняете вызовы базы данных в методе DoWork (главным образом, для чего я использую этот класс), вам необходимо периодически проверять эти свойства и выполнять соответствующую очистку.

...