У меня проблема с условиями гонки. Они описаны в примере кода, где я пишу комментарии // POSSIBLE RACE
. Этот дизайн - то, что я придумал сам, но у него есть проблемы с гонкой, и я не уверен, как их преодолеть. Возможно, использование семафоров - неправильный выбор.
Сценарий: производитель должен создавать задания, пока есть задания в очереди БД, а потребители все еще обрабатывают задания. Если потребители завершили обработку заданий, производитель должен освободить всех потребителей, а производитель и потребители должны уйти.
Как решить нижеприведенную проблему, чтобы у меня мог быть пул потребителей и один производитель, где производитель сигнализирует потребителям, когда проверять очередь на наличие большего количества товаров, если они исчерпаны?
Должен ли я использовать другой шаблон? Должен ли я использовать семафор, мьютекс или какой-либо другой механизм блокировки?
Спасибо за вашу помощь! Я уже давно пытаюсь решить эту проблему.
Fiddle: https://dotnetfiddle.net/Widget/SeNqQx
public class Producer
{
readonly int processorCount = Environment.ProcessorCount;
readonly List<Consumer> consumers = new List<Consumer>();
ConcurrentQueue<Job> jobs;
readonly object queueLock = new object();
readonly Semaphore producerSemaphore;
readonly Semaphore consumerSemaphore;
public Producer()
{
producerSemaphore = new Semaphore(1, 1);
consumerSemaphore = new Semaphore(processorCount, processorCount);
}
public void StartTask()
{
jobs = GetJobs();
using (var resetEvent = new ManualResetEvent(false))
{
for (var i = 0; i < processorCount; i++)
{
var consumer = new Consumer(jobs, queueLock, producerSemaphore, consumerSemaphore);
consumers.Add(consumer);
QueueConsumer(consumer, processorCount, resetEvent);
}
AddJobsToQueueWhenAvailable(resetEvent);
resetEvent.WaitOne(); // waits for QueueConsumer(..) to finish
}
}
private ConcurrentQueue<Job> GetJobs(){
var q = new ConcurrentQueue<Job>();
for (var i = 0; i < 5; i++) q.Enqueue(new Job()); // this usually comes from DB queue
return q;
}
private void QueueConsumer(Consumer consumer, int numberOfThreadsRunning, ManualResetEvent resetEvent)
{
ThreadPool.QueueUserWorkItem(_ =>
{
try
{
consumer.StartJob();
}
catch (Exception ex)
{
Console.WriteLine("Exception occurred " + ex);
}
finally
{
// Safely decrement the counter
if (Interlocked.Decrement(ref numberOfThreadsRunning) == 0)
{
resetEvent.Set();
}
}
});
}
private void AddJobsToQueueWhenAvailable(ManualResetEvent resetEvent)
{
ThreadPool.QueueUserWorkItem(_ =>
{
while (true) // TODO - replace with cancellation token
{
// lock queue - so that no workers will steal another workers item
lock (queueLock)
{
// check that at least 1 worker is still active
if (consumers.TrueForAll(w => !w.IsRunning))
{
// all jobs complete - release all locks if 0 workers active
consumerSemaphore.Release(processorCount);
return;
}
// poll for new items that have been added to the queue
var newJobs = GetJobs();
// for each item:
foreach (var job in newJobs)
{
// add item to queue
jobs.Enqueue(job);
// If we have any workers halted, let them know there are new items!
if (consumers.Any(w => !w.IsRunning))
{
// POSSIBLE RACE - Consumer may set IsRunning=false, but haven't called wait yet!
// signal worker to continue via semaphore
consumerSemaphore.Release(1);
// wait until worker thread wakes up and takes item before unlocking queue
producerSemaphore.WaitOne();
}
}
} // unlock queue
// sleep for a bit
Thread.Sleep(500); // TODO - replace with cancellation token
}
});
}
}
public class Consumer
{
public bool IsRunning;
ConcurrentQueue<Job> jobs;
private object queueLock;
private Semaphore producerSemaphore;
private Semaphore consumerSemaphore;
public Consumer(ConcurrentQueue<Job> jobs, object queueLock, Semaphore producerSemaphore, Semaphore consumerSemaphore)
{
this.jobs = jobs;
this.queueLock = queueLock;
this.producerSemaphore = producerSemaphore;
this.consumerSemaphore = consumerSemaphore;
}
public void StartJob() {
while(TryGetNextJob(out var job)) {
// do stuff with job
}
}
private bool TryGetNextJob(out Job nextJob)
{
// lock to prevent producer from producing items before we've had a chance to wait
lock (queueLock)
{
if (jobs.TryDequeue(out nextJob))
return true; // we have an item - let's process it
// worker halted
IsRunning = false;
}
// wait for signal from producer
consumerSemaphore.WaitOne();
// once received signal, there should be a new item in the queue - if there is not item, it means all children are finished
var itemDequeued = jobs.TryDequeue(out nextJob);
if (!itemDequeued)
{
return false; // looks like it's time to exit
}
// another item for us to process
IsRunning = true;
// let producer know it's safe to release queueLock
producerSemaphore.Release(); // POSSIBLE RACE - producer may not have locked yet! (WaitOne)
return true;
}
}
public class Job { }