Вы должны начать с общей очереди Producer-Consumer и использовать ее.Реализация этого внутри очереди не очень хорошая идея, так как это не позволяет вам использовать семафоры для сигнализации потоков (или вы можете иметь открытые семафоры в своей очереди, но это действительно плохая идея).
Как только поток A поставил в очередь один рабочий элемент, он должен сообщить семафору, чтобы уведомить поток B. Когда поток B завершил обработку всех элементов, он должен сообщить семафору, чтобы уведомить всех остальных о том, что он завершил.Ваш главный поток должен ждать, пока этот второй семафор узнает, что все сделано.
[Edit]
Во-первых, у вас есть производитель и потребитель:
public interface IProducer<T> : IStoppable
{
/// <summary>
/// Notifies clients when a new item is produced.
/// </summary>
event EventHandler<ProducedItemEventArgs<T>> ItemProduced;
}
public interface IConsumer<T> : IStoppable
{
/// <summary>
/// Performs processing of the specified item.
/// </summary>
/// <param name="item">The item.</param>
void ConsumeItem(T item);
}
public interface IStoppable
{
void Stop();
}
Итак, в вашем случае класс, создающий почту, должен будет запустить событие ItemProduced
, а класс, отправляющий его, должен будет реализовать ConsumeItem
.
И тогда вы передадитеэти два экземпляра для экземпляра Worker
:
public class Worker<T>
{
private readonly Object _lock = new Object();
private readonly Queue<T> _queuedItems = new Queue<T>();
private readonly AutoResetEvent _itemReadyEvt = new AutoResetEvent(false);
private readonly IProducer<T> _producer;
private readonly IConsumer<T> _consumer;
private volatile bool _ending = false;
private Thread _workerThread;
public Worker(IProducer<T> producer, IConsumer<T> consumer)
{
_producer = producer;
_consumer = consumer;
}
public void Start(ThreadPriority priority)
{
_producer.ItemProduced += Producer_ItemProduced;
_ending = false;
// start a new thread
_workerThread = new Thread(new ThreadStart(WorkerLoop));
_workerThread.IsBackground = true;
_workerThread.Priority = priority;
_workerThread.Start();
}
public void Stop()
{
_producer.ItemProduced -= Producer_ItemProduced;
_ending = true;
// signal the consumer, in case it is idle
_itemReadyEvt.Set();
_workerThread.Join();
}
private void Producer_ItemProduced
(object sender, ProducedItemEventArgs<T> e)
{
lock (_lock) { _queuedItems.Enqueue(e.Item); }
// notify consumer thread
_itemReadyEvt.Set();
}
private void WorkerLoop()
{
while (!_ending)
{
_itemReadyEvt.WaitOne(-1, false);
T singleItem = default(T);
lock (_lock)
{
if (_queuedItems.Count > 0)
{
singleItem = _queuedItems.Dequeue();
}
}
while (singleItem != null)
{
try
{
_consumer.ConsumeItem(singleItem);
}
catch (Exception ex)
{
// handle exception, fire an event
// or something. Otherwise this
// worker thread will die and you
// will have no idea what happened
}
lock (_lock)
{
if (_queuedItems.Count > 0)
{
singleItem = _queuedItems.Dequeue();
}
}
}
}
} // WorkerLoop
} // Worker
Это общая идея, могут потребоваться некоторые дополнительные настройки.
Чтобы использовать его, вам нужно, чтобы ваши классы реализовалиэти два интерфейса:
IProducer<IMail> mailCreator = new MailCreator();
IConsumer<IMail> mailSender = new MailSender();
Worker<IMail> worker = new Worker<IMail>(mailCreator, mailSender);
worker.Start();
// produce an item - worker will add it to the
// queue and signal the background thread
mailCreator.CreateSomeMail();
// following line will block this (calling) thread
// until all items are consumed
worker.Stop();
Самое замечательное в этом то, что:
- вы можете иметь столько работников, сколько вам нравится
- несколько работников могут принимать предметы изодин и тот же производитель
- несколько работников могут отправлять элементы одному и тому же потребителю (хотя это означает, что вам нужно принять случай, когда потребитель реализован в поточно-ориентированном режиме)