Итак, я довольно новичок в многопоточности и в последнее время использую эту идею во всех моих программах. Прежде чем я начну его использовать, я действительно хочу убедиться, что это правильный эффективный способ реализации многопоточности с использованием Executor, CompletionService и BlockingQueue плюс Observer. Я приведу пример кода ниже, но позвольте мне сначала быстро объяснить, как я думаю, что это работает, и, возможно, это поможет.
Первое, что у меня есть, это BlockingQueue, все задачи добавляются в эту очередь с помощью метода add (Task task). При создании класса метод run вызывается с помощью while (true) вызова take для блокировки очереди, пока что-то не будет добавлено в очередь задач.
Как только что-то добавляется в очередь внутри run (), queue.take () возвращает элемент в очередь. Затем я беру этот элемент и передаю его классу WorkerThread, который делает что-то для него. Этот workerThread добавляется в пул CompletionService, который обрабатывает ожидание завершения потока.
Хорошо, теперь приходит часть, я не уверен, что это правильно. У меня также есть внутренний класс, который реализует runnable и запускается, когда класс инициализируется. Его работа заключается в том, чтобы зацикливаться навсегда, вызывая pool.take (). Таким образом, это по существу ждет завершения одного из WorkerThreads. Я позволил службе завершения справиться с этим. Как только метод take () получает значение, внутренний класс передает его методу-наблюдателю notify.
Это хорошая реализация.? Меня немного беспокоит то, что есть основные классы, запускаемые с циклом while (true) в очереди задач, и внутренний класс, также циклически ожидающий в пуле для получения результата от WorkerThread?
Вот пример реализации. Что ты думаешь?
public class HttpSchedulerThreaded extends Observable implements Runnable {
private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
{
numThreadsInPool = numThreads;
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}
public HttpSchedulerThreaded()
{
numThreadsInPool = 1;
executor = Executors.newFixedThreadPool(1);
doneSignal = new CountDownLatch(1);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}
public void setThreadCount(int numThreads)
{
if(!isRunning){
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
numThreadsInPool = numThreads;
}
}
public void start()
{
if(!isRunning){
responseWorkerThread.start();
new Thread(this).start();
isRunning = true;
}
}
public void add(VulnInfo info) {
queue.add(info);
}
@Override
public void run() {
// TODO Auto-generated method stub
while(shouldRun)
{
try {
VulnInfo info = queue.take();
Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
//System.out.println("submitting to pooler: " + info.getID());
pool.submit(worker);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
* are complete it will send them to server for completion.
* @author Steve
*
*/
class HttpSchedulerWorker implements Runnable{
public void run() {
// TODO Auto-generated method stub
while(true)
{
VulnInfo vulnInfo = null;
try {
//System.out.println("taking finished request");
Future<VulnInfo> tmp = pool.take();
// Future<VulnInfo> tmp = pool.poll();
if(tmp != null)
vulnInfo = tmp.get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(vulnInfo != null)
{
//System.out.println("updating all observers: " + vulnInfo.getID());
updateObservers(vulnInfo);
}
}
}
}