Java правильно использовать ExecutorService, CompletionService, BlockingQueue и Observer? - PullRequest
1 голос
/ 02 сентября 2011

Итак, я довольно новичок в многопоточности и в последнее время использую эту идею во всех моих программах. Прежде чем я начну его использовать, я действительно хочу убедиться, что это правильный эффективный способ реализации многопоточности с использованием Executor, CompletionService и BlockingQueue плюс Observer. Я приведу пример кода ниже, но позвольте мне сначала быстро объяснить, как я думаю, что это работает, и, возможно, это поможет.

Первое, что у меня есть, это BlockingQueue, все задачи добавляются в эту очередь с помощью метода add (Task task). При создании класса метод run вызывается с помощью while (true) вызова take для блокировки очереди, пока что-то не будет добавлено в очередь задач.

Как только что-то добавляется в очередь внутри run (), queue.take () возвращает элемент в очередь. Затем я беру этот элемент и передаю его классу WorkerThread, который делает что-то для него. Этот workerThread добавляется в пул CompletionService, который обрабатывает ожидание завершения потока.

Хорошо, теперь приходит часть, я не уверен, что это правильно. У меня также есть внутренний класс, который реализует runnable и запускается, когда класс инициализируется. Его работа заключается в том, чтобы зацикливаться навсегда, вызывая pool.take (). Таким образом, это по существу ждет завершения одного из WorkerThreads. Я позволил службе завершения справиться с этим. Как только метод take () получает значение, внутренний класс передает его методу-наблюдателю notify.

Это хорошая реализация.? Меня немного беспокоит то, что есть основные классы, запускаемые с циклом while (true) в очереди задач, и внутренний класс, также циклически ожидающий в пуле для получения результата от WorkerThread?

Вот пример реализации. Что ты думаешь?

     public class HttpSchedulerThreaded extends Observable implements Runnable  {

private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
{
    numThreadsInPool = numThreads;
    executor = Executors.newFixedThreadPool(numThreads);
    doneSignal = new CountDownLatch(numThreads);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    queue = new LinkedBlockingQueue<VulnInfo>();
}

public HttpSchedulerThreaded()
{
    numThreadsInPool = 1;
    executor = Executors.newFixedThreadPool(1);
    doneSignal = new CountDownLatch(1);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    queue = new LinkedBlockingQueue<VulnInfo>();
}

public void setThreadCount(int numThreads)
{
    if(!isRunning){
    executor = Executors.newFixedThreadPool(numThreads);
    doneSignal = new CountDownLatch(numThreads);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    numThreadsInPool = numThreads;
    }
}


public void start()
{
    if(!isRunning){
        responseWorkerThread.start();
        new Thread(this).start();
        isRunning = true;
    }

}


public void add(VulnInfo info) {
    queue.add(info);
}

@Override
public void run() {
    // TODO Auto-generated method stub
    while(shouldRun)
    {   
        try {
            VulnInfo info = queue.take();
            Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
            //System.out.println("submitting to pooler: " + info.getID());
            pool.submit(worker);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }   
    }
}

/**
 * Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
 * are complete it will send them to server for completion.
 * @author Steve
 *
 */
class HttpSchedulerWorker  implements Runnable{

    public void run() {
        // TODO Auto-generated method stub
        while(true)
        {
            VulnInfo vulnInfo = null;
            try {
                //System.out.println("taking finished request");
                Future<VulnInfo>    tmp = pool.take();
            //  Future<VulnInfo> tmp = pool.poll();
                if(tmp != null)
                    vulnInfo = tmp.get();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            if(vulnInfo != null)
            {
                //System.out.println("updating all observers: "  + vulnInfo.getID());
                updateObservers(vulnInfo);
            }



        }
    }

}

1 Ответ

2 голосов
/ 02 сентября 2011

Судя по моему опыту, ваше решение кажется правильным. У меня есть три комментария / предложения:

  1. Как только вы создадите новый поток выполнения responseWorkerThread = new Thread(schedulerWorker) и responseWorkerThread.start(), вы по существу разбили эти два цикла. Эта часть выглядит хорошо. Вы, похоже, правильно используете API Executor s, но похоже, что вам может понадобиться еще немного кода для остановки потока HttpScheduledWorker и для отключения ExecutionCompletionService как части класса HttpSchedulerThreaded.
  2. Я не уверен, что использование queue действительно необходимо. ExecutionCompletionService уже использует BlockingQueue для управления задачами, которые ему переданы.
  3. Ваш "вопрос" может лучше подойти на бета-странице Code Code .
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...