Я начинаю с примера кода C # здесь . Я пытаюсь адаптировать его по нескольким причинам: 1) в моем сценарии все задачи будут помещены в очередь заранее, прежде чем потребители начнут работать, и 2) я хотел бы абстрагировать работника в отдельный класс вместо того, чтобы иметь raw Thread
члены в классе WorkerQueue
.
Моя очередь, похоже, не сама себя распределяет, она просто зависает, и когда я ломаюсь в Visual Studio, она застревает в строке _th.Join()
для WorkerThread
# 1. Кроме того, есть ли лучший способ организовать это? Что-то в выставлении методов WaitOne()
и Join()
кажется неправильным, но я не мог придумать подходящий способ взаимодействия WorkerThread
с очередью.
Кроме того, если не считать - если я вызову q.Start(#)
в верхней части блока using
, то только некоторые потоки срабатывают каждый раз (например, потоки 1, 2 и 8 обрабатывают каждую задачу). Почему это? Это какое-то состояние гонки или я что-то не так делаю?
using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;
using System.Threading;
using System.Linq;
namespace QueueTest
{
class Program
{
static void Main(string[] args)
{
using (WorkQueue q = new WorkQueue())
{
q.Finished += new Action(delegate { Console.WriteLine("All jobs finished"); });
Random r = new Random();
foreach (int i in Enumerable.Range(1, 10))
q.Enqueue(r.Next(100, 500));
Console.WriteLine("All jobs queued");
q.Start(8);
}
}
}
class WorkQueue : IDisposable
{
private Queue<int> _jobs = new Queue<int>();
private int _job_count;
private EventWaitHandle _wh = new AutoResetEvent(false);
private object _lock = new object();
private List<WorkerThread> _th;
public event Action Finished;
public WorkQueue()
{
}
public void Start(int num_threads)
{
_job_count = _jobs.Count;
_th = new List<WorkerThread>(num_threads);
foreach (int i in Enumerable.Range(1, num_threads))
{
_th.Add(new WorkerThread(i, this));
_th[_th.Count - 1].JobFinished += new Action<int>(WorkQueue_JobFinished);
}
}
void WorkQueue_JobFinished(int obj)
{
lock (_lock)
{
_job_count--;
if (_job_count == 0 && Finished != null)
Finished();
}
}
public void Enqueue(int job)
{
lock (_lock)
_jobs.Enqueue(job);
_wh.Set();
}
public void Dispose()
{
Enqueue(Int32.MinValue);
_th.ForEach(th => th.Join());
_wh.Close();
}
public int GetNextJob()
{
lock (_lock)
{
if (_jobs.Count > 0)
return _jobs.Dequeue();
else
return Int32.MinValue;
}
}
public void WaitOne()
{
_wh.WaitOne();
}
}
class WorkerThread
{
private Thread _th;
private WorkQueue _q;
private int _i;
public event Action<int> JobFinished;
public WorkerThread(int i, WorkQueue q)
{
_i = i;
_q = q;
_th = new Thread(DoWork);
_th.Start();
}
public void Join()
{
_th.Join();
}
private void DoWork()
{
while (true)
{
int job = _q.GetNextJob();
if (job != Int32.MinValue)
{
Console.WriteLine("Thread {0} Got job {1}", _i, job);
Thread.Sleep(job * 10); // in reality would to actual work here
if (JobFinished != null)
JobFinished(job);
}
else
{
Console.WriteLine("Thread {0} no job available", _i);
_q.WaitOne();
}
}
}
}
}