AutoResetEvent не блокируется должным образом - PullRequest
1 голос
/ 25 августа 2010

У меня есть поток, который создает переменное количество рабочих потоков и распределяет задачи между ними.Это решается передачей потокам объекта TaskQueue , реализацию которого вы увидите ниже.

Эти рабочие потоки просто перебирают объект TaskQueue , который им был дан, выполняякаждая задача.

private class TaskQueue : IEnumerable<Task>
{
    public int Count
    {
        get
        {
            lock(this.tasks)
            {
                return this.tasks.Count;
            }
        }
    }

    private readonly Queue<Task> tasks = new Queue<Task>();
    private readonly AutoResetEvent taskWaitHandle = new AutoResetEvent(false);

    private bool isFinishing = false;
    private bool isFinished = false;

    public void Enqueue(Task task)
    {
        Log.Trace("Entering Enqueue, lock...");
        lock(this.tasks)
        {
            Log.Trace("Adding task, current count = {0}...", Count);
            this.tasks.Enqueue(task);

            if (Count == 1)
            {
                Log.Trace("Count = 1, so setting the wait handle...");
                this.taskWaitHandle.Set();
            }
        }
        Log.Trace("Exiting enqueue...");
    }

    public Task Dequeue()
    {
        Log.Trace("Entering Dequeue...");
        if (Count == 0)
        {
            if (this.isFinishing)
            {
                Log.Trace("Finishing (before waiting) - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }

            Log.Trace("Count = 0, lets wait for a task...");
            this.taskWaitHandle.WaitOne();
            Log.Trace("Wait handle let us through, Count = {0}, IsFinishing = {1}, Returned = {2}", Count, this.isFinishing);

            if(this.isFinishing)
            {
                Log.Trace("Finishing - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }
        }

        Log.Trace("Entering task lock...");
        lock(this.tasks)
        {
            Log.Trace("Entered task lock, about to dequeue next item, Count = {0}", Count);
            return this.tasks.Dequeue();
        }
    }

    public void Finish()
    {
        Log.Trace("Setting TaskQueue state to isFinishing = true and setting wait handle...");
        this.isFinishing = true;

        if (Count == 0)
        {
            this.taskWaitHandle.Set();
        }
    }

    public IEnumerator<Task> GetEnumerator()
    {
        while(true)
        {
            Task t = Dequeue();
            if(this.isFinished)
            {
                yield break;
            }

            yield return t;
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

Как видите, я использую объект AutoResetEvent , чтобы убедиться, что рабочие потоки не завершаются преждевременно, т. е. перед получением каких-либо задач.

В двух словах:

  • основной поток назначает задачу потоку с помощью Enqeueue -ing задачи к ее TaskQueue
  • основнойпоток уведомляет поток о том, что больше нет задач для выполнения, вызывая метод Finish () * TaskQueue
  • , рабочий поток получает следующую назначенную ему задачу, вызывая Dequeue * TaskQueueМетод 1026 * ()

Проблема в том, что метод Dequeue () часто генерирует исключение InvalidOperationException , говоря, что очередь пуста.Как вы можете видеть, я добавил некоторые записи, и оказалось, что AutoResetEvent не блокирует Dequeue () , даже если не было вызовов к его Set() метод.

Насколько я понимаю, вызов AutoResetEvent.Set () позволит продолжить работу ожидающего потока (который ранее вызывал AutoResetEvent.WaitOne ()), а затем автоматически вызывает AutoResetEvent.Reset (), блокируя следующего официанта.

Так что может быть не так?Я что-то не так понял?У меня где-нибудь есть ошибка?Я сижу над этим уже 3 часа, но не могу понять, что не так.Пожалуйста, помогите мне!

Большое спасибо!

Ответы [ 3 ]

6 голосов
/ 25 августа 2010

Ваш код очереди неверен. Вы проверяете графа под замком, затем летите по швам своих штанов, и затем ожидаете, что у заданий что-то будет. Вы не можете сохранять предположения, пока снимаете блокировку :). Ваш счетчик проверок и задач. Удаление должно происходить при блокировке:

bool TryDequeue(out Tasks task)
{
  task = null;
  lock (this.tasks) {
    if (0 < tasks.Count) {
      task = tasks.Dequeue();
    }
  }
  if (null == task) {
    Log.Trace ("Queue was empty");
  }
  return null != task;
 }

Код Enqueue () аналогичным образом изобилует проблемами. Ваша очередь / очередь не гарантируют прогресс (у вас будут блокированные потоки очереди, ожидающие, даже если в очереди есть элементы). Ваша подпись Enqueue() неверна. В целом ваш пост очень плохой код. Честно говоря, я думаю, что вы пытаетесь жевать больше, чем можете кусать здесь ... Ох, и никогда не войдет в систему под замком .

Я настоятельно рекомендую вам использовать ConcurrentQueue .

Если у вас нет доступа к .Net 4.0, вот реализация, с которой можно начать:

public class ConcurrentQueue<T>:IEnumerable<T>
{
    volatile bool fFinished = false;
    ManualResetEvent eventAdded = new ManualResetEvent(false);
    private Queue<T> queue = new Queue<T>();
    private object syncRoot = new object();

    public void SetFinished()
    {
        lock (syncRoot)
        {
            fFinished = true;
            eventAdded.Set();
        }
    }

    public void Enqueue(T t)
    {
        Debug.Assert (false == fFinished);
        lock (syncRoot)
        {
            queue.Enqueue(t);
            eventAdded.Set();
        }
    }

    private bool Dequeue(out T t)
    {
        do
        {
            lock (syncRoot)
            {
                if (0 < queue.Count)
                {
                    t = queue.Dequeue();
                    return true;
                }
                if (false == fFinished)
                {
                    eventAdded.Reset ();
                }
            }
            if (false == fFinished)
            {
                eventAdded.WaitOne();
            }
            else
            {
                break;
            }
        } while (true);
        t = default(T);
        return false;
    }


    public IEnumerator<T> GetEnumerator()
    {
        T t;
        while (Dequeue(out t))
        {
            yield return t;
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}
3 голосов
/ 25 августа 2010

Более подробный ответ от меня ожидается, но я просто хочу указать на кое-что очень важное.

Если вы используете .NET 3.5, , вы можете использовать ConcurrentQueue<T> класс . Бэкпорт включен в библиотеку расширений Rx , которая доступна для .NET 3.5.

Поскольку вы хотите блокировать поведение, вам нужно будет обернуть ConcurrentQueue<T> в BlockingCollection<T> (также доступно как часть Rx).

1 голос
/ 25 августа 2010

Похоже, вы пытаетесь скопировать очередь блокировки.Один из них уже существует в .NET 4.0 BCL как BlockingCollection .Если .NET 4.0 не подходит для вас, вы можете использовать этот код.Он использует Monitor.Wait и Monitor.Pulse метод вместо AutoResetEvent.

public class BlockingCollection<T>
{
    private Queue<T> m_Queue = new Queue<T>();

    public T Take() // Dequeue
    {
        lock (m_Queue)
        {
            while (m_Queue.Count <= 0)
            {
                Monitor.Wait(m_Queue);
            }
            return m_Queue.Dequeue();
        }
    }

    public void Add(T data) // Enqueue
    {
        lock (m_Queue)
        {
            m_Queue.Enqueue(data);
            Monitor.Pulse(m_Queue);
        }
    }
}

Обновление:

Я довольно уверен что невозможно реализовать очередь «производитель-потребитель», используя AutoResetEvent, если вы хотите, чтобы она была поточно-ориентированной для нескольких производителей и нескольких потребителей (я готов оказаться не прав, если кто-то может привести встречный пример).Конечно, вы увидите примеры в интернете, но все они не правы.Фактически, одна из таких попыток Microsoft имеет недостатки в том, что очередь может получить live-lock .

...