Многопоточность рекомендации на основе описания программы - PullRequest
2 голосов
/ 10 марта 2010

Я хотел бы описать некоторые особенности моей программы и получить отзывы о том, какая модель многопоточности лучше всего использовать. Я потратил много времени на чтение ThreadPool, Threads, Producer / Consumer и т. Д., И до сих пор не пришел к твердым выводам.

У меня есть список файлов (все в одном формате), но с разным содержанием. Я должен выполнить работу над каждым файлом. Работа состоит в чтении файла, некоторой обработке, которая занимает около 1-2 минут обработки прямого числа, а затем в конце записи больших выходных файлов.

Я бы хотел, чтобы интерфейс пользовательского интерфейса оставался отзывчивым после начала работы с указанными файлами.

Некоторые вопросы:

  1. Какую модель / механизмы следует использовать? Производитель / Потребитель, WorkPool и т. Д.
  2. Должен ли я использовать BackgroundWorker в пользовательском интерфейсе для отзывчивости или я могу запускать потоки из формы, пока я оставляю поток пользовательского интерфейса в одиночестве, чтобы продолжать отвечать на вводимые пользователем данные?
  3. Как я могу получить результаты или статус каждой отдельной работы над каждым файлом и сообщить об этом в пользовательский интерфейс потокобезопасным способом, чтобы дать обратную связь с пользователем по мере выполнения работы (для обработки может быть около 1000 файлов)

Обновление:

Отличная обратная связь, очень полезная. Я добавляю еще несколько деталей, которые задаются ниже:

  • Вывод в несколько независимых файлов. Один набор выходных файлов для каждого «рабочего элемента», который затем сам читается и обрабатывается другим процессом до завершения «рабочего элемента»

  • Рабочие элементы / потоки не разделяют ресурсы.

  • Рабочие элементы обрабатываются частично с использованием неуправляемой статической библиотеки, которая использует библиотеки повышения.

Ответы [ 5 ]

1 голос
/ 10 марта 2010

Я бы не использовал фонового работника - который связывает вашу обработку со слоем пользовательского интерфейса Winform. Если вы хотите создать невизуальный класс, который обрабатывает потоки и обработку, лучше всего использовать Threadpool.

Я бы использовал Threadpool против "прямых" потоков, поскольку .Net будет выполнять некоторую балансировку нагрузки с пулом и перезапускать потоки, чтобы вам не пришлось нести расходы на создание потоков.

Если вы используете .Net 4, вы можете взглянуть на новую библиотеку параллельной потоковой обработки, я думаю, она оборачивает многие вещи производителя / потребителя.

Вы, вероятно, хотите использовать какую-то «дроссельную заслонку» для управления скоростью обработки файлов (вероятно, вы не хотите, чтобы все 1000 файлов загружались в память одновременно и т. Д.). Вы можете рассмотреть модель производителя / потребителя, где вы можете контролировать, сколько потоков обрабатывается одновременно.

Для потоковых обновлений обратно в пользовательский интерфейс используйте элементы InvokeRequired и Invoke / BeginInvoke на элементах управления Winforms.

Редактировать - пример кода Мой пример проще, чем у Лирика, но он тоже не так хорош. Если вам нужен полноценный производитель / потребитель, следуйте тому, что написал Лирик. Исходя из вашего вопроса, кажется, что вы хотите создать список файлов и передать их в какой-либо другой компонент и позволить этим файлам обрабатываться в фоновом режиме. Если это все, что вы хотите сделать, вам, вероятно, не нужен полноценный производитель / потребитель.

Я предполагаю, что это какая-то пакетная операция, и как только пользователь ее запустит, он не будет добавлять больше файлов, пока пакет не закончится. Если это не так, возможно, вам будет лучше с производителем / потребителем.

Этот пример можно использовать с Winform, но это не обязательно. Вы можете использовать этот компонент в службе, консольном приложении и т. Д .:


    public class FileProcessor
    {
        private int MaxThreads = System.Environment.ProcessorCount;
        private volatile int ActiveWorkers;

        // you could define your own handler here to pass completion stats
        public event System.EventHandler FileProcessed;

        public event System.EventHandler Finished;

        private readonly object LockObj = new object();
        private System.Collections.Generic.Queue Files;

        public void ProcessFiles(System.Collections.Generic.Queue files)
        {
            this.Files = files;
            for (int i = 0; i < this.MaxThreads; i++)
                System.Threading.ThreadPool.QueueUserWorkItem(this.ProcessFile);
        }

        private void ProcessFile(object state)
        {
            this.IncrementActiveWorkers();
            string file = this.DequeueNextFile();
            while (file != null)
            {
                this.DoYourWork(file);
                this.OnFileProcessed(file);
                file = this.DequeueNextFile();
            } 
            // no more files left in the queue
            int workers = this.DecrementActiveWorkers();
            if (workers == 0)
                this.OnFinished();
        }

        // please give me a name!
        private void DoYourWork(string fileName) { }

        private void IncrementActiveWorkers()
        {
            lock (this.LockObj)
            {
                this.ActiveWorkers++;
            }
        }

        private int DecrementActiveWorkers()
        {
            lock (this.LockObj)
            {
                this.ActiveWorkers--;
                return this.ActiveWorkers;
            }
        }

        private string DequeueNextFile()
        {
            lock (this.LockObj)
            {
                // check for items available in queue
                if (this.Files.Count > 0)
                    return this.Files.Dequeue();
                else
                    return null;
            }

        }

        private void OnFileProcessed(string fileName)
        {
            System.EventHandler fileProcessed = this.FileProcessed;
            if (fileProcessed != null)
                fileProcessed(this, System.EventArgs.Empty);
        }

        private void OnFinished()
        {
            System.EventHandler finished = this.Finished;
            if (finished != null)
                finished(this, System.EventArgs.Empty);
        }
    }

Поскольку вы сказали "указанные файлы", я предполагаю, что ваше приложение Winform имеет какую-то сетку или список, или другой элемент управления, с которым взаимодействует пользователь для выбора файлов, которые должны быть обработаны.

Вот пример того, как его использовать:


public class MyForm...
{
  public void Go()
  {
     Queue files = new Queue();
     // enqueue the name/path of all selected files into the queue...
     // now process them
     FileProcessor fp = new FileProcessor();

     // example of using an event
     fp.Finished += this.FileProcessor_Finished;

     fp.ProcessFiles(files);
  }

  private void FileProcessor_Finished(object sender, System.EventArgs e)
  {
     // this event will have been called by a non-ui thread.  Marshal it back to the UI
     if(this.InvokeRequired)
       this.Invoke(FileProcessor_Finished, new object[] {sender, e});
     else
     {
        // handle the event -- this will be run on the UI thread.
     }
  }
}

1 голос
/ 10 марта 2010

Обновление на основе комментариев:
Я не согласен с утверждением, что ThreadPool не сможет справиться с рабочей нагрузкой, с которой вы столкнулись ... давайте рассмотрим вашу проблему и уточним:
1. У вас есть почти 1000 файлов.
2. Для обработки каждого файла может потребоваться до 2 минут интенсивной работы процессора.
3. Для увеличения пропускной способности требуется параллельная обработка.
4. Вы хотите сообщить, когда каждый файл будет завершен, и обновить пользовательский интерфейс.

Реально вы не хотите запускать 1000 потоков, потому что вы ограничены количеством ядер, которые у вас есть ... и, поскольку это требует интенсивной работы процессора, вы, вероятно, максимально увеличите нагрузку на процессор с помощью очень небольшого количества потоков в моих программах обычно оптимально иметь 2-4 потока на ядро).

Таким образом, вы не должны загружать 1000 рабочих элементов в ThreadPool и ожидать увеличения пропускной способности. Вам нужно будет создать среду, в которой вы всегда будете работать с оптимальным количеством потоков, и для этого потребуется некоторое проектирование.

Мне придется немного противоречить моему первоначальному утверждению и на самом деле рекомендовать дизайн «производитель / потребитель». Проверьте этот вопрос для более подробной информации о модели.

Вот как может выглядеть продюсер:

class Producer
{
    private final CountDownLatch _latch;
    private final BlockingQueue _workQueue;
    Producer( CountDownLatch latch, BlockingQueue workQueue)
    {
        _latch = latch;
        _workQueue = workQueue;
    }

    public void Run()
    {
        while(hasMoreFiles)
        {
            // load the file and enqueue it
            _workQueue.Enqueue(nextFileJob);
        }

        _latch.Signal();
    }
}

Вот ваш потребитель:

class Consumer
{
    private final CountDownLatch _latch;
    private final BlockingQueue _workQueue;

    Consumer(CountDownLatch latch, BlockingQueue workQueue, ReportStatusToUI reportDelegate)
    {
        _latch = latch;
        _workQueue = workQueue;
    }

    public void Run()
    {
        while(!terminationCondition)
        {
            // blocks until there is something in the queue
            WorkItem workItem = _workQueue.Dequeue();

            // Work that takes 1-2 minutes
            DoWork(workItem);

            // a delegate that is executed on the UI (use BeginInvoke on the UI)
            reportDelegate(someStatusIndicator);
        }

        _latch.Signal();
    }
}

A CountDownLatch:

public class CountDownLatch
{
    private int m_remain;
    private EventWaitHandle m_event;

    public CountDownLatch(int count)
    {
        Reset(count);
    }

    public void Reset(int count)
    {
        if (count < 0)
            throw new ArgumentOutOfRangeException();
        m_remain = count;
        m_event = new ManualResetEvent(false);
        if (m_remain == 0)
        {
            m_event.Set();
        }
    }

    public void Signal()
    {
        // The last thread to signal also sets the event.
        if (Interlocked.Decrement(ref m_remain) == 0)
            m_event.Set();
    }

    public void Wait()
    {
        m_event.WaitOne();
    }
}

Jicksa's BlockingQueue :

class BlockingQueue<T> {
    private Queue<T> q = new Queue<T>();

    public void Enqueue(T element) {
        q.Enqueue(element);
        lock (q) {
            Monitor.Pulse(q);
        }
    }

    public T Dequeue() {
        lock(q) {
            while (q.Count == 0) {
                Monitor.Wait(q);
            }
            return q.Dequeue();
        }
    }
}

Так, что это оставляет? Ну, теперь все, что вам нужно сделать, это запустить все ваши темы ... вы можете запустить их в ThreadPool, как BackgroundWorker, или каждый как new Thread , и это не делает любая разница .

Вам нужно только создать один Producer и оптимальное количество Consumers, которое будет возможно, учитывая количество ядер, которое у вас есть (около 2-4 Потребителя на ядро).

Родительский поток ( NOT ваш поток пользовательского интерфейса) должен блокироваться, пока все потребительские потоки не будут завершены:

void StartThreads()
{
    CountDownLatch latch = new CountDownLatch(numConsumer+numProducer);
    BlockingQueue<T> workQueue = new BlockingQueue<T>();

    Producer producer = new Producer(latch, workQueue);
    if(youLikeThreads)
    {
        Thread p = new Thread(producer.Run);
        p.IsBackground = true;
        p.Start();
    }
    else if(youLikeThreadPools)
    {
        ThreadPool.QueueUserWorkItem(producer.Run);
    }

    for (int i; i < numConsumers; ++i)
    {
        Consumer consumer = new Consumer(latch, workQueue, theDelegate);

        if(youLikeThreads)
        {
            Thread c = new Thread(consumer.Run);

            c.IsBackground = true;

            c.Start();
        }
        else if(youLikeThreadPools)
        {
            ThreadPool.QueueUserWorkItem(consumer.Run);
        }
    }

    // wait for all the threads to signal
    latch.Wait();

    SayHelloToTheUI();
}

Обратите внимание, что приведенный выше код является только иллюстративным. Вам все еще нужно отправить сигнал завершения на Consumer и Producer, и вам нужно сделать это безопасным для потока способом.

1 голос
/ 10 марта 2010

Как правило, вы должны использовать BackgroundWorker для фоновой обработки для пользовательского интерфейса, поскольку именно для этого и предназначен класс. И обычно пул потоков используется для серверных приложений.

Вы можете попробовать использовать несколько BackgroundWorkers для выполнения того, что вам нужно сделать. Просто добавьте все файлы в очередь, а затем создайте BackgroundWorker для чтения из очереди и обработки следующего файла. Вы могли бы, вероятно, породить до n рабочих для обработки нескольких файлов одновременно; вам просто потребуются некоторые средства для отслеживания того, какой работник обрабатывает каждый файл, чтобы вы сообщали о значительном прогрессе в пользовательский интерфейс.

Чтобы определить, какую работу выполняет каждый работник, вы можете передать аргумент RunWorkerAsync, который идентифицирует поток. Затем к этому аргументу можно получить доступ в DoWork через свойство DoWorkEventArgs.Argument. Чтобы узнать, какой работник сообщает о прогрессе, вы можете добавить обработчик событий для каждого отдельно и / или передать объект в ReportProgress, который идентифицирует работника.

Это помогает?

0 голосов
/ 10 марта 2010

BackhgroundWorker звучит разумно.
Основной вопрос заключается в том, сколько из них должно работать параллельно, поскольку ваша задача, по-видимому, в большей степени связана с вводом-выводом, чем с нагрузкой на процессор, плюс вы можете выиграть, читая и записывая на разные устройства ввода-вывода.

0 голосов
/ 10 марта 2010

Я согласен с Джастином Этьером. BackgroundWorker - это простой в использовании инструмент для работы с потоками.

Я понимаю, что вы столкнулись с ситуацией, когда вам интересно, какую модель потоков использовать. Таким образом, это зависит от объектов, с которыми вы работаете. Позвольте мне объяснить.

Даже если вы хотели бы использовать, скажем, небрежную модель потоков, в которой разработчику не нужно беспокоиться о безопасности потоков, если ваши объекты или библиотеки не являются потокобезопасными, вам нужно будет использовать lock () для таких объектов до они могут быть доступны для следующей темы. Например, коллекции .NET 3.5 не являются потокобезопасными.

Вот связанный вопрос , который должен помочь, кроме того, есть объяснение от самого Эрика Липперта! Я также рекомендую вам посмотреть его блог на MSDN .

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...