Использование Java ThreadPool - PullRequest
7 голосов
/ 04 августа 2010

Я пытаюсь написать многопоточный веб-сканер.

Мой основной класс записи имеет следующий код:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null)
         return;
exec.execute(new URLCrawler(this, url));
}

URLCrawler выбирает указанный URL-адрес, анализирует HTML-ссылки и извлекает ссылкииз него и планирует невидимые ссылки обратно на границу.

Граница - это очередь неисследованных URL-адресов.Проблема в том, как написать метод get ().Если очередь пуста, она должна дождаться окончания работы любых URLCrawlers, а затем повторить попытку.Он должен возвращать ноль только тогда, когда очередь пуста и в настоящее время нет активного URLCrawler.

Моей первой идеей было использование AtomicInteger для подсчета текущего числа работающих URLCrawlers и вспомогательного объекта для notifyAll () / wait () звонки.Каждый сканер при запуске увеличивает количество текущих работающих URLCrawlers, а при выходе уменьшает его и уведомляет объект, который он завершил.

Но я читал, что notify () / notifyAll () и wait () несколькоустаревшие методы для связи между потоками.

Что я должен использовать в этом шаблоне работы?Это похоже на производителей M и N потребителей, вопрос в том, как бороться с истощением производителей.

Ответы [ 6 ]

3 голосов
/ 04 августа 2010

Один из вариантов - сделать «границу» блокирующей очередью, поэтому любой поток, пытающийся «получить» ее, заблокируется.Как только любой другой URLCrawler помещает объекты в эту очередь, любые другие потоки будут автоматически уведомлены (с удаленным объектом в очереди)

3 голосов
/ 04 августа 2010

Я не уверен, что понимаю ваш дизайн, но это может быть работа для <a href="http://download-llnw.oracle.com/javase/6/docs/api/java/util/concurrent/Semaphore.html" rel="nofollow noreferrer">Semaphore</a>

2 голосов
/ 07 декабря 2012

Вопрос немного старый, но я думаю, что нашел какое-то простое, работающее решение:

Расширьте класс ThreadPoolExecutor, как показано ниже.Новая функциональность позволяет сохранять количество активных задач (к сожалению, при условии, что getActiveCount() ненадежен).Если taskCount.get() == 0 и задач больше нет, это означает, что ничего не поделаешь, и исполнитель завершает работу.У вас есть свои критерии выхода.Кроме того, если вы создадите своего исполнителя, но не сможете отправить какие-либо задачи, он не заблокирует:

public class CrawlingThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger taskCount = new AtomicInteger();

    public CrawlingThreadPoolExecutor() {
        super(8, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {

        super.beforeExecute(t, r);
        taskCount.incrementAndGet();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        super.afterExecute(r, t);
        taskCount.decrementAndGet();
        if (getQueue().isEmpty() && taskCount.get() == 0) {
            shutdown();
        }
    }
}

Еще одна вещь, которую вам нужно сделать, - реализовать ваш Runnable так, чтобы он сохранял ссылкуExecutor вы используете для того, чтобы иметь возможность отправлять новые задачи.Вот макет:

public class MockFetcher implements Runnable {

    private final String url;
    private final Executor e;

    public MockFetcher(final Executor e, final String url) {
        this.e = e;
        this.url = url;
    }

    @Override
    public void run() {
        final List<String> newUrls = new ArrayList<>();
        // Parse doc and build url list, and then:
        for (final String newUrl : newUrls) {
            e.execute(new MockFetcher(this.e, newUrl));
        }
    }
}
2 голосов
/ 05 августа 2010

Я думаю, что основным строительным блоком для вашего варианта использования является «защелка», похожая на CountDownLatch, но в отличие от CountDownLatch, которая также позволяет увеличивать счетчик.

Интерфейсдля такой защелки может быть

public interface Latch {
    public void countDown();
    public void countUp();
    public void await() throws InterruptedException;
    public int getCount();
}

Допустимые значения для счетчиков будут 0 и выше.Метод await () позволит вам блокировать, пока счетчик не упадет до нуля.

Если у вас есть такая защелка, ваш сценарий использования может быть описан довольно легко.Я также подозреваю, что очередь (граница) может быть устранена в этом решении (исполнитель в любом случае предоставляет ее, поэтому она несколько избыточна).Я бы переписал вашу основную подпрограмму как

ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers);
Latch latch = ...; // instantiate a latch
URL[] initialUrls = ...;
for (URL url: initialUrls) {
    executor.execute(new URLCrawler(this, url, latch));
}
// now wait for all crawling tasks to finish
latch.await();

Ваш URLCrawler будет использовать защелку следующим образом:

public class URLCrawler implements Runnable {
    private final Latch latch;

    public URLCrawler(..., Latch l) {
        ...
        latch = l;
        latch.countUp(); // increment the count as early as possible
    }

    public void run() {
        try {
            List<URL> secondaryUrls = crawl();
            for (URL url: secondaryUrls) {
                // submit new tasks directly
                executor.execute(new URLCrawler(..., latch));
            }
        } finally {
            // as a last step, decrement the count
            latch.countDown();
        }
    }
}

Что касается реализаций защелки, может быть несколько возможных реализаций,начиная от того, который основан на wait () и notifyAll (), который использует Lock and Condition, до реализации, которая использует AbstractQueuedSynchronizer.Все эти реализации, я думаю, будут довольно простыми.Обратите внимание, что версия wait () - notifyAll () и версия Lock-Condition будут основаны на взаимном исключении, тогда как версия AQS будет использовать CAS (сравнение и замена) и, следовательно, может лучше масштабироваться в определенных ситуациях.

2 голосов
/ 04 августа 2010

Я думаю, что использование ожидания / уведомления оправдано в этом случае.Не могу придумать какой-либо прямой способ сделать это, используя juc
В классе давайте назовем Coordinator:

private final int numOfCrawlers;
private int waiting;

public boolean shouldTryAgain(){
    synchronized(this){
        waiting++;
        if(waiting>=numOfCrawlers){
            //Everybody is waiting, terminate
            return false;
        }else{
            wait();//spurious wake up is okay
            //waked up for whatever reason. Try again
            waiting--;
            return true;
        }
    }

public void hasEnqueued(){
    synchronized(this){
        notifyAll();
    }
} 

затем,

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null){
        if(!coordinator.shouldTryAgain()){
            //all threads are waiting. No possibility of new jobs.
            return;
        }else{
            //Possible that there are other jobs. Try again
            continue;
        }
    }
    exec.execute(new URLCrawler(this, url));
}//while(true)
0 голосов
/ 01 марта 2011

Я хотел бы предложить AdaptiveExecuter. На основе значения признака вы можете выбрать сериализацию или параллализацию потока для выполнения. В приведенном ниже примере PUID - это строка / объект, который я хотел использовать для принятия этого решения. Вы можете изменить логику в соответствии с вашим кодом. Некоторые части кода прокомментированы, чтобы позволить дальнейшие эксперименты.

класс AdaptiveExecutor реализует Executor { конечные задачи очереди = new LinkedBlockingQueue (); Работоспособный активный; // ExecutorService threadExecutor = Executors.newCachedThreadPool (); static ExecutorService threadExecutor = Executors.newFixedThreadPool (4);

AdaptiveExecutor() {
    System.out.println("Initial Queue Size=" + tasks.size());
}

public void execute(final Runnable r) {
    /* if immediate start is needed do either of below two
    new Thread(r).start();

    try {
        threadExecutor.execute(r);
    } catch(RejectedExecutionException rEE ) {
        System.out.println("Thread Rejected " + new Thread(r).getName());
    }

    */


    tasks.offer(r); // otherwise, queue them up
    scheduleNext(new Thread(r)); // and kick next thread either serial or parallel.
    /*
    tasks.offer(new Runnable() {
        public void run() {
            try {
                r.run();
            } finally {
                scheduleNext();
            }
        }
    });
    */
    if ((active == null)&& !tasks.isEmpty()) {
        active = tasks.poll();
        try {
            threadExecutor.submit(active);
        } catch (RejectedExecutionException rEE) {
            System.out.println("Thread Rejected " + new Thread(r).getName());
        }
    }

    /*
    if ((active == null)&& !tasks.isEmpty()) {
        scheduleNext();
    } else tasks.offer(r);
    */
    //tasks.offer(r);

    //System.out.println("Queue Size=" + tasks.size());

}

private void serialize(Thread th) {
    try {
        Thread activeThread = new Thread(active);

        th.wait(200);
        threadExecutor.submit(th);
    } catch (InterruptedException iEx) {

    }
    /*
    active=tasks.poll();
    System.out.println("active thread is " +  active.toString() );
    threadExecutor.execute(active);
    */
}

private void parallalize() {
    if(null!=active)
        threadExecutor.submit(active);
}

protected void scheduleNext(Thread r) {
    //System.out.println("scheduleNext called") ;
    if(false==compareKeys(r,new Thread(active)))
        parallalize();
    else serialize(r);
}

private boolean compareKeys(Thread r, Thread active) {
    // TODO: obtain names of threads. If they contain same PUID, serialize them.
    if(null==active)
        return true; // first thread should be serialized
    else return false;  //rest all go parallel, unless logic controlls it
}

}

...