Если у вас есть доступ к .NET 4.0, то, что вы хотите сделать, может быть достигнуто с помощью BlockingCollection .
Если вы хотите сделать это самостоятельно с помощью класса Monitor
и сигнализации с помощью Pulse()
, вы на правильном пути.
Вы получаете исключение, потому что для вызова Wait()
, Pulse()
и PulseAll()
, вы должны владеть lock
для указанного объекта . Вы пропустите это на waiting
.
Пример базовой поточно-ориентированной очереди, которую можно использовать:
- с
foreach
на потребителя,
- с
while
или вашей любимой условной конструкцией на стороне производителя,
- обрабатывает несколько производителей / потребителей и
- использует
lock()
, Monitor.Pulse()
, Monitor.PulseAll()
и Monitor.Wait()
:
.
public class SignaledQueue<T>
{
Queue<T> queue = new Queue<T>();
volatile bool shutDown = false;
public bool Enqueue(T item)
{
if (!shutDown)
{
lock (queue)
{
queue.Enqueue(item);
//Pulse only if there can be waiters.
if (queue.Count == 1)
{
Monitor.PulseAll(queue);
}
}
return true;
}
//Indicate that processing should stop.
return false;
}
public IEnumerable<T> DequeueAll()
{
while (!shutDown)
{
do
{
T item;
lock (queue)
{
//If the queue is empty, wait.
if (queue.Count == 0)
{
if (shutDown) break;
Monitor.Wait(queue);
if (queue.Count == 0) break;
}
item = queue.Dequeue();
}
yield return item;
} while (!shutDown);
}
}
public void SignalShutDown()
{
shutDown = true;
lock (queue)
{
//Signal all waiting consumers with PulseAll().
Monitor.PulseAll(queue);
}
}
}
Пример использования:
class Program
{
static void Main(string[] args)
{
int numProducers = 4, numConsumers = 2;
SignaledQueue<int> queue = new SignaledQueue<int>();
ParameterizedThreadStart produce = delegate(object obj)
{
Random rng = new Random((int)obj);
int num = 0;
while (queue.Enqueue(++num))
{
Thread.Sleep(rng.Next(100));
}
};
ThreadStart consume = delegate
{
foreach (int num in queue.DequeueAll())
{
Console.Write(" {0}", num);
}
};
Random seedRng = new Random();
for (int i = 0; i < numProducers; i++)
{
new Thread(produce).Start(seedRng.Next());
}
for (int i = 0; i < numConsumers; i++)
{
new Thread(consume).Start();
}
Console.ReadKey(true);
queue.SignalShutDown();
}
}