C # как только основной поток спит, все потоки остановлены - PullRequest
4 голосов
/ 19 февраля 2010

У меня есть класс, на котором работает модель Producer-Consumer, например:

public class SyncEvents
{
    public bool waiting;

    public SyncEvents()
    {
        waiting = true;
    }
}

public class Producer
{
    private readonly Queue<Delegate> _queue;
    private SyncEvents _sync;
    private Object _waitAck;

    public Producer(Queue<Delegate> q, SyncEvents sync, Object obj)
    {
        _queue = q;
        _sync = sync;
        _waitAck = obj;
    }

    public void ThreadRun()
    {
        lock (_sync)
        {
            while (true)
            {
                Monitor.Wait(_sync, 0);
                if (_queue.Count > 0)
                {
                    _sync.waiting = false;
                }
                else
                {
                    _sync.waiting = true;
                    lock (_waitAck)
                    {
                        Monitor.Pulse(_waitAck);
                    }
                }
                Monitor.Pulse(_sync);
            }
        }
    }

}

public class Consumer
{
    private readonly Queue<Delegate> _queue;
    private SyncEvents _sync;

    private int count = 0;

    public Consumer(Queue<Delegate> q, SyncEvents sync)
    {
        _queue = q;
        _sync = sync;
    }

    public void ThreadRun()
    {
        lock (_sync)
        {
            while (true)
            {
                while (_queue.Count == 0)
                {
                    Monitor.Wait(_sync);
                }

                Delegate query = _queue.Dequeue();
                query.DynamicInvoke(null);

                count++;

                Monitor.Pulse(_sync);
            }
        }
    }
}

/// <summary>
/// Act as a consumer to the queries produced by the DataGridViewCustomCell
/// </summary>
public class QueryThread
{
    private SyncEvents _syncEvents = new SyncEvents();
    private Object waitAck = new Object();
    private Queue<Delegate> _queryQueue = new Queue<Delegate>();

    Producer queryProducer;
    Consumer queryConsumer;

    public QueryThread()
    {
        queryProducer = new Producer(_queryQueue, _syncEvents, waitAck);
        queryConsumer = new Consumer(_queryQueue, _syncEvents);

        Thread producerThread = new Thread(queryProducer.ThreadRun);
        Thread consumerThread = new Thread(queryConsumer.ThreadRun);

        producerThread.IsBackground = true;
        consumerThread.IsBackground = true;

        producerThread.Start();
        consumerThread.Start();
    }

    public bool isQueueEmpty()
    {
        return _syncEvents.waiting;
    }

    public void wait()
    {
        lock (waitAck)
        {
            while (_queryQueue.Count > 0)
            {
                Monitor.Wait(waitAck);
            }
        }
    }

    public void Enqueue(Delegate item)
    {
        _queryQueue.Enqueue(item);
    }
}

Код работает без сбоев, но функция wait ().В некоторых случаях я хочу подождать, пока все функции в очереди не будут завершены, поэтому я сделал функцию wait ().

Производитель будет запускать импульс waitAck в подходящее время.

Однако, когда строка «Monitor.Wait (waitAck);»запускается в функции wait (), все потоки останавливаются, включая поток производителя и потребителя.

Почему это происходит и как я могу это решить?спасибо!

Ответы [ 2 ]

1 голос
/ 19 февраля 2010

Вот мой взгляд на ваш код:

public class ProducerConsumer
{
    private ManualResetEvent _ready;
    private Queue<Delegate> _queue; 
    private Thread _consumerService;
    private static Object _sync = new Object();

    public ProducerConsumer(Queue<Delegate> queue)
    {
        lock (_sync)
        {
            // Note: I would recommend that you don't even
            // bother with taking in a queue.  You should be able
            // to just instantiate a new Queue<Delegate>()
            // and use it when you Enqueue.  There is nothing that
            // you really need to pass into the constructor.
            _queue = queue;
            _ready = new ManualResetEvent(false);
            _consumerService = new Thread(Run);
            _consumerService.IsBackground = true;
            _consumerService.Start();
        }
    }

    public override void Enqueue(Delegate value)
    {
        lock (_sync)
        {
            _queue.Enqueue(value);
            _ready.Set();
        }
    }

    // The consumer blocks until the producer puts something in the queue.
    private void Run()
    {
        Delegate query;
        try
        {
            while (true)
            {
                _ready.WaitOne();
                lock (_sync)
                {
                    if (_queue.Count > 0)
                    {
                        query = _queue.Dequeue();
                        query.DynamicInvoke(null);
                    }
                    else
                    {
                        _ready.Reset();
                        continue;
                    }
                }
            }
        }
        catch (ThreadInterruptedException)
        {
            _queue.Clear();
            return;
        }
    }


    protected override void Dispose(bool disposing)
    {
        lock (_sync)
        {
            if (_consumerService != null)
            {
                _consumerService.Interrupt();
            }
        }
        base.Dispose(disposing);
    }


}

Я не совсем уверен, чего вы пытаетесь достичь с помощью функции ожидания ... Я предполагаю, что вы пытаетесь установить какой-то тип ограничения на количество элементов, которые могут быть поставлены в очередь. В этом случае просто сгенерируйте исключение или верните сигнал сбоя, когда в очереди слишком много элементов, клиент, который вызывает Enqueue, будет повторять попытки, пока очередь не сможет принять больше элементов. Принятие оптимистического подхода избавит вас от МНОГО головных болей и просто поможет вам избавиться от множества сложной логики.

Если вы ДЕЙСТВИТЕЛЬНО хотите провести там ожидание, тогда я, вероятно, могу помочь вам найти лучший подход. Дайте мне знать, чего вы пытаетесь достичь с помощью ожидания, и я вам помогу.

Примечание: Я взял этот код из одного из моих проектов, немного изменил его и разместил здесь ... могут быть небольшие синтаксические ошибки, но логика должна быть правильной.

ОБНОВЛЕНИЕ: На основании ваших комментариев я внес некоторые изменения: я добавил еще один ManualResetEvent в класс, поэтому, когда вы вызываете BlockQueue(), он дает вам событие, которое вы можете подождать, и устанавливает флаг, чтобы функция Enqueue не ставила в очередь больше элементов. Как только все запросы в очереди будут обслужены, для флага будет установлено значение true, а для события _wait установлено, чтобы тот, кто ожидает, получил сигнал.

public class ProducerConsumer
{
    private bool _canEnqueue;
    private ManualResetEvent _ready;
    private Queue<Delegate> _queue; 
    private Thread _consumerService;

    private static Object _sync = new Object();
    private static ManualResetEvent _wait = new ManualResetEvent(false);

    public ProducerConsumer()
    {
        lock (_sync)
        {
            _queue = new Queue<Delegate> _queue;
            _canEnqueue = true;
            _ready = new ManualResetEvent(false);
            _consumerService = new Thread(Run);
            _consumerService.IsBackground = true;
            _consumerService.Start();
        }
    }

    public bool Enqueue(Delegate value)
    {
        lock (_sync)
        {
            // Don't allow anybody to enqueue
            if( _canEnqueue )
            {
                _queue.Enqueue(value);
                _ready.Set();
                return true;
            }
        }
        // Whoever is calling Enqueue should try again later.
        return false;
    }

    // The consumer blocks until the producer puts something in the queue.
    private void Run()
    {
        try
        {
            while (true)
            {
                // Wait for a query to be enqueued
                _ready.WaitOne();

                // Process the query
                lock (_sync)
                {
                    if (_queue.Count > 0)
                    {
                        Delegate query = _queue.Dequeue();
                        query.DynamicInvoke(null);
                    }
                    else
                    {
                        _canEnqueue = true;
                        _ready.Reset();
                        _wait.Set();
                        continue;
                    }
                }
            }
        }
        catch (ThreadInterruptedException)
        {
            _queue.Clear();
            return;
        }
    }

    // Block your queue from enqueuing, return null
    // if the queue is already empty.
    public ManualResetEvent BlockQueue()
    {
        lock(_sync)
        {
            if( _queue.Count > 0 )
            {
                _canEnqueue = false;
                _wait.Reset();
            }
            else
            {
                // You need to tell the caller that they can't
                // block your queue while it's empty. The caller
                // should check if the result is null before calling
                // WaitOne().
                return null;
            }
        }
        return _wait;
    }

    protected override void Dispose(bool disposing)
    {
        lock (_sync)
        {
            if (_consumerService != null)
            {
                _consumerService.Interrupt();
                // Set wait when you're disposing the queue
                // so that nobody is left with a lingering wait.
                _wait.Set();
            }
        }
        base.Dispose(disposing);
    }
}
1 голос
/ 19 февраля 2010

Кажется очень маловероятным, что все потоки на самом деле остановятся, хотя я должен отметить, что во избежание ложных пробуждений вам, вероятно, следует использовать цикл while вместо оператора if:

lock (waitAck)
{
    while(queryProducer.secondQueue.Count > 0)
    {
        Monitor.Wait(waitAck);
    }
}

Тот факт, что вы звоните Monitor.Wait, означает, что waitAck должен быть освобожден, чтобы он не препятствовал блокировке пользовательских потоков ...

Не могли бы вы дать больше информации о том, как «прекращаются» потоки производителя / потребителя? Похоже, они только что зашли в тупик?

Ваш производитель использует Notify или NotifyAll? Теперь у вас есть дополнительный поток ожидания, поэтому, если вы используете только Notify, он будет освобождать только один поток ... трудно понять, является ли это проблемой без подробностей ваших Producer и Consumer классы.

Если бы вы могли показать короткую, но полную программу для демонстрации проблемы, это помогло бы.

РЕДАКТИРОВАТЬ: Хорошо, теперь вы опубликовали код, я вижу ряд проблем:

  • Наличие такого большого количества открытых переменных - путь к катастрофе. Ваши классы должны инкапсулировать их функциональность, чтобы другому коду не приходилось ковыряться в поисках фрагментов реализации. (Например, ваш вызывающий код здесь действительно не должен иметь доступа к очереди.)

  • Вы добавляете элементы непосредственно во вторую очередь, что означает, что вы не можете эффективно разбудить производителя, чтобы добавить их в первую очередь. Почему у вас даже есть несколько очередей?

  • Вы всегда ждете _sync в ветке продюсера ... почему? Что собирается уведомить об этом для начала? Вообще говоря, поток производителя не должен ждать, если у вас нет ограниченного буфера

  • У вас есть статическая переменная (_waitAck), которая перезаписывается при каждом создании нового экземпляра. Это плохая идея.

Вы также не показали свой класс SyncEvents - это значит делать что-нибудь интересное?

Если честно, похоже, что у вас довольно странный дизайн - вам лучше всего начинать заново с нуля. Попробуйте инкапсулировать всю очередь производителя / потребителя в одном классе, который имеет методы Produce и Consume, а также WaitForEmpty (или что-то в этом роде). Я думаю, что таким образом вы найдете логику синхронизации намного проще.

...