Вот мой взгляд на ваш код:
public class ProducerConsumer
{
private ManualResetEvent _ready;
private Queue<Delegate> _queue;
private Thread _consumerService;
private static Object _sync = new Object();
public ProducerConsumer(Queue<Delegate> queue)
{
lock (_sync)
{
// Note: I would recommend that you don't even
// bother with taking in a queue. You should be able
// to just instantiate a new Queue<Delegate>()
// and use it when you Enqueue. There is nothing that
// you really need to pass into the constructor.
_queue = queue;
_ready = new ManualResetEvent(false);
_consumerService = new Thread(Run);
_consumerService.IsBackground = true;
_consumerService.Start();
}
}
public override void Enqueue(Delegate value)
{
lock (_sync)
{
_queue.Enqueue(value);
_ready.Set();
}
}
// The consumer blocks until the producer puts something in the queue.
private void Run()
{
Delegate query;
try
{
while (true)
{
_ready.WaitOne();
lock (_sync)
{
if (_queue.Count > 0)
{
query = _queue.Dequeue();
query.DynamicInvoke(null);
}
else
{
_ready.Reset();
continue;
}
}
}
}
catch (ThreadInterruptedException)
{
_queue.Clear();
return;
}
}
protected override void Dispose(bool disposing)
{
lock (_sync)
{
if (_consumerService != null)
{
_consumerService.Interrupt();
}
}
base.Dispose(disposing);
}
}
Я не совсем уверен, чего вы пытаетесь достичь с помощью функции ожидания ... Я предполагаю, что вы пытаетесь установить какой-то тип ограничения на количество элементов, которые могут быть поставлены в очередь. В этом случае просто сгенерируйте исключение или верните сигнал сбоя, когда в очереди слишком много элементов, клиент, который вызывает Enqueue, будет повторять попытки, пока очередь не сможет принять больше элементов. Принятие оптимистического подхода избавит вас от МНОГО головных болей и просто поможет вам избавиться от множества сложной логики.
Если вы ДЕЙСТВИТЕЛЬНО хотите провести там ожидание, тогда я, вероятно, могу помочь вам найти лучший подход. Дайте мне знать, чего вы пытаетесь достичь с помощью ожидания, и я вам помогу.
Примечание: Я взял этот код из одного из моих проектов, немного изменил его и разместил здесь ... могут быть небольшие синтаксические ошибки, но логика должна быть правильной.
ОБНОВЛЕНИЕ: На основании ваших комментариев я внес некоторые изменения: я добавил еще один ManualResetEvent
в класс, поэтому, когда вы вызываете BlockQueue()
, он дает вам событие, которое вы можете подождать, и устанавливает флаг, чтобы функция Enqueue не ставила в очередь больше элементов. Как только все запросы в очереди будут обслужены, для флага будет установлено значение true, а для события _wait
установлено, чтобы тот, кто ожидает, получил сигнал.
public class ProducerConsumer
{
private bool _canEnqueue;
private ManualResetEvent _ready;
private Queue<Delegate> _queue;
private Thread _consumerService;
private static Object _sync = new Object();
private static ManualResetEvent _wait = new ManualResetEvent(false);
public ProducerConsumer()
{
lock (_sync)
{
_queue = new Queue<Delegate> _queue;
_canEnqueue = true;
_ready = new ManualResetEvent(false);
_consumerService = new Thread(Run);
_consumerService.IsBackground = true;
_consumerService.Start();
}
}
public bool Enqueue(Delegate value)
{
lock (_sync)
{
// Don't allow anybody to enqueue
if( _canEnqueue )
{
_queue.Enqueue(value);
_ready.Set();
return true;
}
}
// Whoever is calling Enqueue should try again later.
return false;
}
// The consumer blocks until the producer puts something in the queue.
private void Run()
{
try
{
while (true)
{
// Wait for a query to be enqueued
_ready.WaitOne();
// Process the query
lock (_sync)
{
if (_queue.Count > 0)
{
Delegate query = _queue.Dequeue();
query.DynamicInvoke(null);
}
else
{
_canEnqueue = true;
_ready.Reset();
_wait.Set();
continue;
}
}
}
}
catch (ThreadInterruptedException)
{
_queue.Clear();
return;
}
}
// Block your queue from enqueuing, return null
// if the queue is already empty.
public ManualResetEvent BlockQueue()
{
lock(_sync)
{
if( _queue.Count > 0 )
{
_canEnqueue = false;
_wait.Reset();
}
else
{
// You need to tell the caller that they can't
// block your queue while it's empty. The caller
// should check if the result is null before calling
// WaitOne().
return null;
}
}
return _wait;
}
protected override void Dispose(bool disposing)
{
lock (_sync)
{
if (_consumerService != null)
{
_consumerService.Interrupt();
// Set wait when you're disposing the queue
// so that nobody is left with a lingering wait.
_wait.Set();
}
}
base.Dispose(disposing);
}
}