Производитель Потребительские гонки - PullRequest
0 голосов
/ 14 апреля 2020

У меня проблема с условиями гонки. Они описаны в примере кода, где я пишу комментарии // POSSIBLE RACE. Этот дизайн - то, что я придумал сам, но у него есть проблемы с гонкой, и я не уверен, как их преодолеть. Возможно, использование семафоров - неправильный выбор.

Сценарий: производитель должен создавать задания, пока есть задания в очереди БД, а потребители все еще обрабатывают задания. Если потребители завершили обработку заданий, производитель должен освободить всех потребителей, а производитель и потребители должны уйти.

Как решить нижеприведенную проблему, чтобы у меня мог быть пул потребителей и один производитель, где производитель сигнализирует потребителям, когда проверять очередь на наличие большего количества товаров, если они исчерпаны?

Должен ли я использовать другой шаблон? Должен ли я использовать семафор, мьютекс или какой-либо другой механизм блокировки?

Спасибо за вашу помощь! Я уже давно пытаюсь решить эту проблему.

Fiddle: https://dotnetfiddle.net/Widget/SeNqQx

public class Producer
{
    readonly int processorCount = Environment.ProcessorCount;
    readonly List<Consumer> consumers = new List<Consumer>();
    ConcurrentQueue<Job> jobs;
    readonly object queueLock = new object();
    readonly Semaphore producerSemaphore;
    readonly Semaphore consumerSemaphore;

    public Producer()
    {
        producerSemaphore = new Semaphore(1, 1);
        consumerSemaphore = new Semaphore(processorCount, processorCount);
    }

    public void StartTask()
    {
        jobs = GetJobs();
        using (var resetEvent = new ManualResetEvent(false))
        {
            for (var i = 0; i < processorCount; i++)
            {
                var consumer = new Consumer(jobs, queueLock, producerSemaphore, consumerSemaphore);
                consumers.Add(consumer);
                QueueConsumer(consumer, processorCount, resetEvent);
            }

            AddJobsToQueueWhenAvailable(resetEvent);
            resetEvent.WaitOne(); // waits for QueueConsumer(..) to finish
        }
    }

    private ConcurrentQueue<Job> GetJobs(){
        var q = new ConcurrentQueue<Job>();
        for (var i = 0; i < 5; i++) q.Enqueue(new Job()); // this usually comes from DB queue
        return q;
    }

    private void QueueConsumer(Consumer consumer, int numberOfThreadsRunning, ManualResetEvent resetEvent)
    {
        ThreadPool.QueueUserWorkItem(_ =>
        {
            try
            {
                consumer.StartJob();
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception occurred " + ex);
            }
            finally
            {

                // Safely decrement the counter
                if (Interlocked.Decrement(ref numberOfThreadsRunning) == 0)
                {
                        resetEvent.Set();
                }
            }
        });
    }
    private void AddJobsToQueueWhenAvailable(ManualResetEvent resetEvent)
    {
        ThreadPool.QueueUserWorkItem(_ =>
        {
            while (true) // TODO - replace with cancellation token
            {
                // lock queue - so that no workers will steal another workers item
                lock (queueLock)
                {
                    // check that at least 1 worker is still active
                    if (consumers.TrueForAll(w => !w.IsRunning))
                    {
                        // all jobs complete - release all locks if 0 workers active
                        consumerSemaphore.Release(processorCount);
                        return;
                    }

                    // poll for new items that have been added to the queue
                    var newJobs = GetJobs();

                    // for each item:
                    foreach (var job in newJobs)
                    {
                        // add item to queue
                        jobs.Enqueue(job);

                        // If we have any workers halted, let them know there are new items!
                        if (consumers.Any(w => !w.IsRunning))
                        {
                            // POSSIBLE RACE - Consumer may set IsRunning=false, but haven't called wait yet!
                            // signal worker to continue via semaphore
                            consumerSemaphore.Release(1);
                            // wait until worker thread wakes up and takes item before unlocking queue
                            producerSemaphore.WaitOne();
                        }
                    }
                } // unlock queue

                // sleep for a bit
                Thread.Sleep(500); // TODO - replace with cancellation token
            }
        });
    }
}

public class Consumer
{
    public bool IsRunning;
    ConcurrentQueue<Job> jobs;
    private object queueLock;
    private Semaphore producerSemaphore;
    private Semaphore consumerSemaphore;

    public Consumer(ConcurrentQueue<Job> jobs, object queueLock, Semaphore producerSemaphore, Semaphore consumerSemaphore)
    {
        this.jobs = jobs;
        this.queueLock = queueLock;
        this.producerSemaphore = producerSemaphore;
        this.consumerSemaphore = consumerSemaphore;
    }

    public void StartJob() {
        while(TryGetNextJob(out var job)) {
            // do stuff with job
        }
    }

    private bool TryGetNextJob(out Job nextJob)
    {
        // lock to prevent producer from producing items before we've had a chance to wait
        lock (queueLock)
        {
            if (jobs.TryDequeue(out nextJob))
                return true; // we have an item - let's process it

            // worker halted
            IsRunning = false;
        }

        // wait for signal from producer
        consumerSemaphore.WaitOne();

        // once received signal, there should be a new item in the queue - if there is not item, it means all children are finished
        var itemDequeued = jobs.TryDequeue(out nextJob);
        if (!itemDequeued)
        {
            return false; // looks like it's time to exit
        }

        // another item for us to process 
        IsRunning = true;
        // let producer know it's safe to release queueLock        
        producerSemaphore.Release(); // POSSIBLE RACE - producer may not have locked yet! (WaitOne)

        return true;
    }

}

public class Job { }

Ответы [ 2 ]

3 голосов
/ 15 апреля 2020

Я бы рекомендовал взглянуть на BlockingCollection . Однако многие потребительские потоки могут вызывать Take, если есть элемент, он будет возвращен, в противном случае поток заблокируется. Он также поддерживает установку ограничения на емкость, чтобы сделать блок добавления потока, если емкость превышена.

Это должно устранить необходимость в семафорах и событиях сброса, а также значительно упростить код в целом. См. Сборник блокировок и Проблема производителя-потребителя для более полного описания.

0 голосов
/ 15 апреля 2020

Спасибо за помощь. Я непременно посмотрю на BlockingCollection .

Так что я на самом деле был недалеко от того, что хотел. Мне просто нужно было прочитать немного больше о семафорах (инициализация с правильным начальным счетом), чтобы код работал правильно, а также несколько других кусочков. Найдите EDIT, чтобы увидеть, что изменилось. Рабочий раствор:

public class Producer
{
    readonly int processorCount = Environment.ProcessorCount;
    readonly List<Consumer> consumers = new List<Consumer>();
    ConcurrentQueue<Job> jobs;
    readonly object queueLock = new object();
    readonly Semaphore producerSemaphore;
    readonly Semaphore consumerSemaphore;
    int numberOfThreadsRunning;

    public Producer()
    {
        producerSemaphore = new Semaphore(0, 1); // EDIT - MUST START WITH 0 INITIALLY
        consumerSemaphore = new Semaphore(0, processorCount); // EDIT - MUST START WITH 0 INITIALLY
        numberOfThreadsRunning = processorCount; // EDIT - take copy so that Interlocked.Decrement references the same int variable in memory
    }

    public void StartTask()
    {
        jobs = GetJobs();
        using (var resetEvent = new ManualResetEvent(false))
        {
            for (var i = 0; i < processorCount; i++)
            {
                var consumer = new Consumer(jobs, queueLock, producerSemaphore, consumerSemaphore);
                consumers.Add(consumer);
                QueueConsumer(consumer, resetEvent);
            }

            AddJobsToQueueWhenAvailable(resetEvent);
            resetEvent.WaitOne(); // waits for QueueConsumer(..) to finish
        }
    }

    private ConcurrentQueue<Job> GetJobs(){
        var q = new ConcurrentQueue<Job>();
        for (var i = 0; i < 5; i++) q.Enqueue(new Job()); // this usually comes from DB queue
        return q;
    }

    private void QueueConsumer(Consumer consumer, ManualResetEvent resetEvent)
    {
        ThreadPool.QueueUserWorkItem(_ =>
        {
            try
            {
                consumer.StartJob();
            }
            catch (Exception ex)
            {
                lock (queueLock)
                {
                    consumers.Remove(worker);
                }
                Console.WriteLine("Exception occurred " + ex);
            }
            finally
            {

                // Safely decrement the counter
                if (Interlocked.Decrement(ref numberOfThreadsRunning) == 0)
                {
                        resetEvent.Set();
                }
            }
        });
    }
    private void AddJobsToQueueWhenAvailable(ManualResetEvent resetEvent)
    {
        ThreadPool.QueueUserWorkItem(_ =>
        {
            while (true) // TODO - replace with cancellation token
            {
                // lock queue - so that no workers will steal another workers item
                lock (queueLock)
                {
                    // check that at least 1 worker is still active
                    if (consumers.TrueForAll(w => !w.IsRunning))
                    {
                        // all jobs complete - release all locks if 0 workers active
                        consumerSemaphore.Release(processorCount);
                        return;
                    }

                    // poll for new items that have been added to the queue
                    var newJobs = GetJobs();

                    // for each item:
                    foreach (var job in newJobs)
                    {
                        // add item to queue
                        jobs.Enqueue(job);

                        // If we have any workers halted, let them know there are new items!
                        if (consumers.Any(w => !w.IsRunning))
                        {
                            // POSSIBLE RACE - Consumer may set IsRunning=false, but haven't called wait yet!
                            // EDIT - Ordering does not matter. If semaphore is Released() before WaitOne() is 
                            //        called, then consumer will just continue as soon as it calls WaitOne()
                            // signal worker to continue via semaphore
                            consumerSemaphore.Release();
                            // wait until worker thread wakes up and takes item before unlocking queue
                            producerSemaphore.WaitOne();
                        }
                    }
                } // unlock queue

                // sleep for a bit
                Thread.Sleep(500); // TODO - replace with cancellation token
            }
        });
    }
}

public class Consumer
{
    public bool IsRunning;
    ConcurrentQueue<Job> jobs;
    private object queueLock;
    private Semaphore producerSemaphore;
    private Semaphore consumerSemaphore;

    public Consumer(ConcurrentQueue<Job> jobs, object queueLock, Semaphore producerSemaphore, Semaphore consumerSemaphore)
    {
        this.jobs = jobs;
        this.queueLock = queueLock;
        this.producerSemaphore = producerSemaphore;
        this.consumerSemaphore = consumerSemaphore;
        CurrentlyProcessing = true; // EDIT - must default to true so producer doesn't exit prematurely

    }

    public void StartJob() {
        while(TryGetNextJob(out var job)) {
            // do stuff with job
        }
    }

    private bool TryGetNextJob(out Job nextJob)
    {
        // lock to prevent producer from producing items before we've had a chance to wait
        lock (queueLock)
        {
            if (jobs.TryDequeue(out nextJob))
                return true; // we have an item - let's process it

            // worker halted
            IsRunning = false;
        }

        // wait for signal from producer
        consumerSemaphore.WaitOne();

        // once received signal, there should be a new item in the queue - if there is not item, it means all children are finished
        var itemDequeued = jobs.TryDequeue(out nextJob);
        if (!itemDequeued)
        {
            return false; // looks like it's time to exit
        }

        // another item for us to process 
        IsRunning = true;
        // let producer know it's safe to release queueLock        
        producerSemaphore.Release(); // POSSIBLE RACE - producer may not have locked yet! (WaitOne)
        // EDIT - Order does not matter. If we call Release() before producer calls WaitOne(), then
        //        Producer will just continue as soon as it calls WaitOne().

        return true;
    }

}

public class Job { }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...