Исключение синхронизации - PullRequest
3 голосов
/ 29 марта 2010

У меня есть два потока, один поток обрабатывает очередь, а другой поток добавляет вещи в очередь.

  1. Я хочу перевести поток обработки очереди в спящий режим, когда он завершит обработку очереди
  2. Я хочу, чтобы 2-й поток сказал ему проснуться, когда он добавил элемент в очередь

Однако эти функции вызывают System.Threading.SynchronizationLockException: Object synchronization method was called from an unsynchronized block of code при вызове Monitor.PulseAll(waiting);, потому что я не синхронизировал функцию с ожидающим объектом. [что я не хочу делать, я хочу иметь возможность обрабатывать при добавлении элементов в очередь]. Как мне этого добиться?

Queue<object> items = new Queue<object>();
object waiting = new object();

1-ая тема

public void ProcessQueue()
{
 while (true)
 {
   if (items.Count == 0)
     Monitor.Wait(waiting);

    object real = null;
    lock(items) {
    object item = items.Dequeue();
    real = item;
    }
    if(real == null)
        continue;
    .. bla bla bla
 } 
}

2-й поток включает

public void AddItem(object o)
{
 ... bla bla bla
 lock(items)
 {
 items.Enqueue(o);
 }
 Monitor.PulseAll(waiting);
}

Ответы [ 4 ]

2 голосов
/ 31 марта 2010

Ответ содержится в сообщении об ошибке, которое вы разместили: «Метод синхронизации объектов был вызван из несинхронизированного блока кода в Monitor.PulseAll (ожидание);»

Вы должны вызвать Monitor.PulseAll (ожидание) из блока блокировки (ожидания).

Также ... вы должны вызывать Monitor.Wait также из блока блокировки.

2 голосов
/ 31 марта 2010

Если у вас есть доступ к .NET 4.0, то, что вы хотите сделать, может быть достигнуто с помощью BlockingCollection .
Если вы хотите сделать это самостоятельно с помощью класса Monitor и сигнализации с помощью Pulse(), вы на правильном пути.
Вы получаете исключение, потому что для вызова Wait(), Pulse() и PulseAll(), вы должны владеть lock для указанного объекта . Вы пропустите это на waiting.

Пример базовой поточно-ориентированной очереди, которую можно использовать:

  • с foreach на потребителя,
  • с while или вашей любимой условной конструкцией на стороне производителя,
  • обрабатывает несколько производителей / потребителей и
  • использует lock(), Monitor.Pulse(), Monitor.PulseAll() и Monitor.Wait():

.

public class SignaledQueue<T>
{
    Queue<T> queue = new Queue<T>();
    volatile bool shutDown = false;

    public bool Enqueue(T item)
    {
        if (!shutDown)
        {
            lock (queue)
            {
                queue.Enqueue(item);
                //Pulse only if there can be waiters.
                if (queue.Count == 1)
                {
                    Monitor.PulseAll(queue);
                }
            }
            return true;
        }
        //Indicate that processing should stop.
        return false;
    }

    public IEnumerable<T> DequeueAll()
    {
        while (!shutDown)
        {
            do
            {
                T item;
                lock (queue)
                {
                    //If the queue is empty, wait.
                    if (queue.Count == 0)
                    {
                        if (shutDown) break;
                        Monitor.Wait(queue);
                        if (queue.Count == 0) break;
                    }
                    item = queue.Dequeue();
                }
                yield return item;
            } while (!shutDown);
        }
    }

    public void SignalShutDown()
    {
        shutDown = true;
        lock (queue)
        {
            //Signal all waiting consumers with PulseAll().
            Monitor.PulseAll(queue);
        }
    }

}

Пример использования:

class Program
{
    static void Main(string[] args)
    {
        int numProducers = 4, numConsumers = 2;
        SignaledQueue<int> queue = new SignaledQueue<int>();

        ParameterizedThreadStart produce = delegate(object obj)
        {
            Random rng = new Random((int)obj);
            int num = 0;
            while (queue.Enqueue(++num))
            {
                Thread.Sleep(rng.Next(100));
            } 
        };

        ThreadStart consume = delegate
        {
            foreach (int num in queue.DequeueAll())
            {
                Console.Write(" {0}", num);
            }
        };

        Random seedRng = new Random();
        for (int i = 0; i < numProducers; i++)
        {
            new Thread(produce).Start(seedRng.Next());
        }

        for (int i = 0; i < numConsumers; i++)
        {
            new Thread(consume).Start();
        }

        Console.ReadKey(true);
        queue.SignalShutDown();

    }
}
2 голосов
/ 29 марта 2010

Использовать семафор http://msdn.microsoft.com/library/system.threading.semaphore.aspx он был разработан именно для этого

0 голосов
/ 31 марта 2010

Я предпочитаю использовать обратный вызов, который запускает поток обработки, который продолжается до тех пор, пока он не будет захвачен, с блокировками, заставляющими одновременных читателей и писателей ждать в очереди:

public delegate void CallbackDelegate();

class Program
{
    static void Main(string[] args)
    {
        Queue<object> items = new Queue<object>();

        Processor processor = new Processor(items);
        Adder adder = new Adder(items, new CallbackDelegate(processor.CallBack));

        Thread addThread = new Thread(new ParameterizedThreadStart(adder.AddItem));
        object objectToAdd = new object();
        addThread.Start(objectToAdd);
    }
}

class Processor
{
    Queue<object> items;

    public Processor(Queue<object> itemsArg)
    {
        items = itemsArg;
    }

    public void ProcessQueue()
    {
        lock (items)
        {
            while (items.Count > 0)
            {
                object real = items.Dequeue();
                // process real
            }
        }
    }

    public void CallBack()
    {
        Thread processThread = new Thread(ProcessQueue);
        processThread.IsBackground = true;
        processThread.Start();
    }
}

class Adder
{
    Queue<object> items;
    CallbackDelegate callback;

    public Adder(Queue<object> itemsArg, CallbackDelegate callbackArg)
    {
        items = itemsArg;
        callback = callbackArg;
    }

    public void AddItem(object o)
    {
        lock (items) { items.Enqueue(o); }
        callback();
    }
}
...