Обновление на основе комментариев:
Я не согласен с утверждением, что ThreadPool не сможет справиться с рабочей нагрузкой, с которой вы столкнулись ... давайте рассмотрим вашу проблему и уточним:
1. У вас есть почти 1000 файлов.
2. Для обработки каждого файла может потребоваться до 2 минут интенсивной работы процессора.
3. Для увеличения пропускной способности требуется параллельная обработка.
4. Вы хотите сообщить, когда каждый файл будет завершен, и обновить пользовательский интерфейс.
Реально вы не хотите запускать 1000 потоков, потому что вы ограничены количеством ядер, которые у вас есть ... и, поскольку это требует интенсивной работы процессора, вы, вероятно, максимально увеличите нагрузку на процессор с помощью очень небольшого количества потоков в моих программах обычно оптимально иметь 2-4 потока на ядро).
Таким образом, вы не должны загружать 1000 рабочих элементов в ThreadPool
и ожидать увеличения пропускной способности. Вам нужно будет создать среду, в которой вы всегда будете работать с оптимальным количеством потоков, и для этого потребуется некоторое проектирование.
Мне придется немного противоречить моему первоначальному утверждению и на самом деле рекомендовать дизайн «производитель / потребитель». Проверьте этот вопрос для более подробной информации о модели.
Вот как может выглядеть продюсер:
class Producer
{
private final CountDownLatch _latch;
private final BlockingQueue _workQueue;
Producer( CountDownLatch latch, BlockingQueue workQueue)
{
_latch = latch;
_workQueue = workQueue;
}
public void Run()
{
while(hasMoreFiles)
{
// load the file and enqueue it
_workQueue.Enqueue(nextFileJob);
}
_latch.Signal();
}
}
Вот ваш потребитель:
class Consumer
{
private final CountDownLatch _latch;
private final BlockingQueue _workQueue;
Consumer(CountDownLatch latch, BlockingQueue workQueue, ReportStatusToUI reportDelegate)
{
_latch = latch;
_workQueue = workQueue;
}
public void Run()
{
while(!terminationCondition)
{
// blocks until there is something in the queue
WorkItem workItem = _workQueue.Dequeue();
// Work that takes 1-2 minutes
DoWork(workItem);
// a delegate that is executed on the UI (use BeginInvoke on the UI)
reportDelegate(someStatusIndicator);
}
_latch.Signal();
}
}
A CountDownLatch
:
public class CountDownLatch
{
private int m_remain;
private EventWaitHandle m_event;
public CountDownLatch(int count)
{
Reset(count);
}
public void Reset(int count)
{
if (count < 0)
throw new ArgumentOutOfRangeException();
m_remain = count;
m_event = new ManualResetEvent(false);
if (m_remain == 0)
{
m_event.Set();
}
}
public void Signal()
{
// The last thread to signal also sets the event.
if (Interlocked.Decrement(ref m_remain) == 0)
m_event.Set();
}
public void Wait()
{
m_event.WaitOne();
}
}
Jicksa's BlockingQueue :
class BlockingQueue<T> {
private Queue<T> q = new Queue<T>();
public void Enqueue(T element) {
q.Enqueue(element);
lock (q) {
Monitor.Pulse(q);
}
}
public T Dequeue() {
lock(q) {
while (q.Count == 0) {
Monitor.Wait(q);
}
return q.Dequeue();
}
}
}
Так, что это оставляет? Ну, теперь все, что вам нужно сделать, это запустить все ваши темы ... вы можете запустить их в ThreadPool
, как BackgroundWorker
, или каждый как new Thread
, и это не делает любая разница .
Вам нужно только создать один Producer
и оптимальное количество Consumers
, которое будет возможно, учитывая количество ядер, которое у вас есть (около 2-4 Потребителя на ядро).
Родительский поток ( NOT ваш поток пользовательского интерфейса) должен блокироваться, пока все потребительские потоки не будут завершены:
void StartThreads()
{
CountDownLatch latch = new CountDownLatch(numConsumer+numProducer);
BlockingQueue<T> workQueue = new BlockingQueue<T>();
Producer producer = new Producer(latch, workQueue);
if(youLikeThreads)
{
Thread p = new Thread(producer.Run);
p.IsBackground = true;
p.Start();
}
else if(youLikeThreadPools)
{
ThreadPool.QueueUserWorkItem(producer.Run);
}
for (int i; i < numConsumers; ++i)
{
Consumer consumer = new Consumer(latch, workQueue, theDelegate);
if(youLikeThreads)
{
Thread c = new Thread(consumer.Run);
c.IsBackground = true;
c.Start();
}
else if(youLikeThreadPools)
{
ThreadPool.QueueUserWorkItem(consumer.Run);
}
}
// wait for all the threads to signal
latch.Wait();
SayHelloToTheUI();
}
Обратите внимание, что приведенный выше код является только иллюстративным. Вам все еще нужно отправить сигнал завершения на Consumer
и Producer
, и вам нужно сделать это безопасным для потока способом.