У меня проблема с условиями гонки. Они описаны в примере кода, где я пишу комментарии // POSSIBLE RACE
. Этот дизайн - то, что я придумал сам, но у него есть проблемы с гонкой, и я не уверен, как их преодолеть. Возможно, использование семафоров - неправильный выбор.
Сценарий: производитель должен создавать задания, пока есть задания в очереди БД, а потребители все еще обрабатывают задания. Если потребители завершили обработку заданий, производитель должен освободить всех потребителей, а производитель и потребители должны уйти.
Как решить нижеприведенную проблему, чтобы у меня мог быть пул потребителей и один производитель, где производитель сигнализирует потребителям, когда проверять очередь на наличие большего количества товаров, если они исчерпаны?
Должен ли я использовать другой шаблон? Должен ли я использовать семафор, мьютекс или какой-либо другой механизм блокировки?
Спасибо за вашу помощь! Я уже давно пытаюсь решить эту проблему.
Fiddle: https://dotnetfiddle.net/Widget/SeNqQx
public class Producer
readonly int processorCount = Environment.ProcessorCount;
readonly List<Consumer> consumers = new List<Consumer>();
ConcurrentQueue<Job> jobs;
readonly object queueLock = new object();
readonly Semaphore producerSemaphore;
readonly Semaphore consumerSemaphore;
public Producer()
producerSemaphore = new Semaphore(1, 1);
consumerSemaphore = new Semaphore(processorCount, processorCount);
public void StartTask()
jobs = GetJobs();
using (var resetEvent = new ManualResetEvent(false))
for (var i = 0; i < processorCount; i++)
var consumer = new Consumer(jobs, queueLock, producerSemaphore, consumerSemaphore);
QueueConsumer(consumer, processorCount, resetEvent);
resetEvent.WaitOne(); // waits for QueueConsumer(..) to finish
private ConcurrentQueue<Job> GetJobs(){
var q = new ConcurrentQueue<Job>();
for (var i = 0; i < 5; i++) q.Enqueue(new Job()); // this usually comes from DB queue
return q;
private void QueueConsumer(Consumer consumer, int numberOfThreadsRunning, ManualResetEvent resetEvent)
ThreadPool.QueueUserWorkItem(_ =>
catch (Exception ex)
Console.WriteLine("Exception occurred " + ex);
// Safely decrement the counter
if (Interlocked.Decrement(ref numberOfThreadsRunning) == 0)
private void AddJobsToQueueWhenAvailable(ManualResetEvent resetEvent)
ThreadPool.QueueUserWorkItem(_ =>
while (true) // TODO - replace with cancellation token
// lock queue - so that no workers will steal another workers item
lock (queueLock)
// check that at least 1 worker is still active
if (consumers.TrueForAll(w => !w.IsRunning))
// all jobs complete - release all locks if 0 workers active
// poll for new items that have been added to the queue
var newJobs = GetJobs();
// for each item:
foreach (var job in newJobs)
// add item to queue
// If we have any workers halted, let them know there are new items!
if (consumers.Any(w => !w.IsRunning))
// POSSIBLE RACE - Consumer may set IsRunning=false, but haven't called wait yet!
// signal worker to continue via semaphore
// wait until worker thread wakes up and takes item before unlocking queue
} // unlock queue
// sleep for a bit
Thread.Sleep(500); // TODO - replace with cancellation token
public class Consumer
public bool IsRunning;
ConcurrentQueue<Job> jobs;
private object queueLock;
private Semaphore producerSemaphore;
private Semaphore consumerSemaphore;
public Consumer(ConcurrentQueue<Job> jobs, object queueLock, Semaphore producerSemaphore, Semaphore consumerSemaphore)
this.jobs = jobs;
this.queueLock = queueLock;
this.producerSemaphore = producerSemaphore;
this.consumerSemaphore = consumerSemaphore;
public void StartJob() {
while(TryGetNextJob(out var job)) {
// do stuff with job
private bool TryGetNextJob(out Job nextJob)
// lock to prevent producer from producing items before we've had a chance to wait
lock (queueLock)
if (jobs.TryDequeue(out nextJob))
return true; // we have an item - let's process it
// worker halted
IsRunning = false;
// wait for signal from producer
// once received signal, there should be a new item in the queue - if there is not item, it means all children are finished
var itemDequeued = jobs.TryDequeue(out nextJob);
if (!itemDequeued)
return false; // looks like it's time to exit
// another item for us to process
IsRunning = true;
// let producer know it's safe to release queueLock
producerSemaphore.Release(); // POSSIBLE RACE - producer may not have locked yet! (WaitOne)
return true;
public class Job { }