У меня есть поток, который создает переменное количество рабочих потоков и распределяет задачи между ними.Это решается передачей потокам объекта TaskQueue , реализацию которого вы увидите ниже.
Эти рабочие потоки просто перебирают объект TaskQueue , который им был дан, выполняякаждая задача.
private class TaskQueue : IEnumerable<Task>
{
public int Count
{
get
{
lock(this.tasks)
{
return this.tasks.Count;
}
}
}
private readonly Queue<Task> tasks = new Queue<Task>();
private readonly AutoResetEvent taskWaitHandle = new AutoResetEvent(false);
private bool isFinishing = false;
private bool isFinished = false;
public void Enqueue(Task task)
{
Log.Trace("Entering Enqueue, lock...");
lock(this.tasks)
{
Log.Trace("Adding task, current count = {0}...", Count);
this.tasks.Enqueue(task);
if (Count == 1)
{
Log.Trace("Count = 1, so setting the wait handle...");
this.taskWaitHandle.Set();
}
}
Log.Trace("Exiting enqueue...");
}
public Task Dequeue()
{
Log.Trace("Entering Dequeue...");
if (Count == 0)
{
if (this.isFinishing)
{
Log.Trace("Finishing (before waiting) - isCompleted set, returning empty task.");
this.isFinished = true;
return new Task();
}
Log.Trace("Count = 0, lets wait for a task...");
this.taskWaitHandle.WaitOne();
Log.Trace("Wait handle let us through, Count = {0}, IsFinishing = {1}, Returned = {2}", Count, this.isFinishing);
if(this.isFinishing)
{
Log.Trace("Finishing - isCompleted set, returning empty task.");
this.isFinished = true;
return new Task();
}
}
Log.Trace("Entering task lock...");
lock(this.tasks)
{
Log.Trace("Entered task lock, about to dequeue next item, Count = {0}", Count);
return this.tasks.Dequeue();
}
}
public void Finish()
{
Log.Trace("Setting TaskQueue state to isFinishing = true and setting wait handle...");
this.isFinishing = true;
if (Count == 0)
{
this.taskWaitHandle.Set();
}
}
public IEnumerator<Task> GetEnumerator()
{
while(true)
{
Task t = Dequeue();
if(this.isFinished)
{
yield break;
}
yield return t;
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
Как видите, я использую объект AutoResetEvent , чтобы убедиться, что рабочие потоки не завершаются преждевременно, т. е. перед получением каких-либо задач.
В двух словах:
- основной поток назначает задачу потоку с помощью Enqeueue -ing задачи к ее TaskQueue
- основнойпоток уведомляет поток о том, что больше нет задач для выполнения, вызывая метод Finish () * TaskQueue
- , рабочий поток получает следующую назначенную ему задачу, вызывая Dequeue * TaskQueueМетод 1026 * ()
Проблема в том, что метод Dequeue () часто генерирует исключение InvalidOperationException , говоря, что очередь пуста.Как вы можете видеть, я добавил некоторые записи, и оказалось, что AutoResetEvent не блокирует Dequeue () , даже если не было вызовов к его Set() метод.
Насколько я понимаю, вызов AutoResetEvent.Set () позволит продолжить работу ожидающего потока (который ранее вызывал AutoResetEvent.WaitOne ()), а затем автоматически вызывает AutoResetEvent.Reset (), блокируя следующего официанта.
Так что может быть не так?Я что-то не так понял?У меня где-нибудь есть ошибка?Я сижу над этим уже 3 часа, но не могу понять, что не так.Пожалуйста, помогите мне!
Большое спасибо!