Дрю прав, я думаю, что ConcurrentQueue, хотя он звучит идеально для работы, на самом деле является базовой структурой данных, которую использует BlockingCollection. Кажется, мне тоже очень далеко.
Проверьте главу 7 этой книги *
http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1
и это объяснит, как использовать BlockingCollection и иметь несколько производителей и нескольких потребителей, каждый из которых снимает «очередь». Вы захотите взглянуть на метод «GetConsumingEnumerable ()» и, возможно, просто вызовите .ToObservable () для этого.
* остальная часть книги довольно средняя.
редактирование:
Вот пример программы, которая, я думаю, делает то, что вы хотите?
class Program
{
private static ManualResetEvent _mre = new ManualResetEvent(false);
static void Main(string[] args)
{
var theQueue = new BlockingCollection<string>();
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));
theQueue.GetConsumingEnumerable()
.ToObservable(Scheduler.TaskPool)
.Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));
LoadQueue(theQueue, "Producer A");
LoadQueue(theQueue, "Producer B");
LoadQueue(theQueue, "Producer C");
_mre.Set();
Console.WriteLine("Processing now....");
Console.ReadLine();
}
private static void ProcessNewValue(string value, string consumerName, int delay)
{
Thread.SpinWait(delay);
Console.WriteLine("{1} consuming {0}", value, consumerName);
}
private static void LoadQueue(BlockingCollection<string> target, string prefix)
{
var thread = new Thread(() =>
{
_mre.WaitOne();
for (int i = 0; i < 100; i++)
{
target.Add(string.Format("{0} {1}", prefix, i));
}
});
thread.Start();
}
}