Сбор ссылок с веб-страниц с использованием пула потоков Java - PullRequest
0 голосов
/ 06 января 2012

Я программирую сборщик ссылок с указанного количества страниц. Чтобы сделать его более эффективным, я использую ThreadPool с фиксированным размером. Поскольку я действительно новичок в области многопоточности, у меня есть проблемы с исправлением некоторых проблем. Так что моя идея состоит в том, что каждый поток делает одно и то же: подключается к странице и собирает каждый URL. После этого URL добавляются в очередь для следующего потока.

Но это не работает. Сначала программа анализирует baseurl и добавляет URL-адреса из него. Но сначала я хочу сделать это только с LinksToVisit.add (baseurl) и запустить его с помощью threadpool, но он всегда опрашивает очередь и потоки не добавляют ничего нового, поэтому в верхней части очереди значение null. И я не знаю, почему: (

Я пытался сделать это с ArrayBlockingQueue, но безуспешно. Исправление этого с помощью анализа базового URL-адреса не является хорошим решением, потому что когда на baseurl есть, например, только одна ссылка, она не следует за ним. Поэтому я думаю, что поступаю неправильно или упускаю что-то важное. В качестве HTML-парсера я использую Jsoup. Спасибо за ответы.

Источник (удалены ненужные методы):

package collector;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Iterator;
import java.util.Map;
import java.util.Scanner;
import java.util.Map.Entry;
import java.util.concurrent.*;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;


public class Collector {

private String baseurl;
private int links;
private int cvlinks;
private double time;
private int chcount;
private static final int NTHREADS = Runtime.getRuntime().availableProcessors()*2;
private ConcurrentLinkedQueue<String> LinksToVisit = new ConcurrentLinkedQueue<String>();
private ConcurrentSkipListMap<String, Double> SortedCharMap = new ConcurrentSkipListMap<String, Double>();
private ConcurrentHashMap<String, Double> CharMap = new ConcurrentHashMap<String, Double>();

public Collector(String url, int links) {
    this.baseurl = url;
    this.links = links;
    this.cvlinks = 0;
    this.chcount = 0;

    try {
        Document html = Jsoup.connect(url).get();

        if(cvlinks != links){
            Elements collectedLinks = html.select("a[href]");
            for(Element link:collectedLinks){
                if(cvlinks == links) break;
                else{
                    String current = link.attr("abs:href");
                    if(!current.equals(url) && current.startsWith(baseurl)&& !current.contains("#")){
                        LinksToVisit.add(current);
                        cvlinks++;
                    }
                }
            }
        }

        AnalyzeDocument(html, url);
    } catch (IOException e) {
        e.printStackTrace();
    }
    CollectFromWeb();
}

private void AnalyzeDocument(Document doc,String url){
    String text = doc.body().text().toLowerCase().replaceAll("[^a-z]", "").trim();
    chcount += text.length();
    String chars[] = text.split("");
    CharCount(chars);

}
private void CharCount(String[] chars) {
    for(int i = 1; i < chars.length; i++) {
        if(!CharMap.containsKey(chars[i]))  
            CharMap.put(chars[i],1.0);
        else
            CharMap.put(chars[i], CharMap.get(chars[i]).doubleValue()+1);
    }
}

private void CollectFromWeb(){
    long startTime = System.nanoTime();
    ExecutorService executor = Executors.newFixedThreadPool(NTHREADS);
     CollectorThread[] workers = new CollectorThread[this.links];
    for (int i = 0; i < this.links; i++) {
        if(!LinksToVisit.isEmpty()){
            int j = i+1;
            System.out.println("Collecting from "+LinksToVisit.peek()+" ["+j+"/"+links+"]");
            //Runnable worker = new CollectorThread(LinksToVisit.poll());   
            workers[i] = new CollectorThread(LinksToVisit.poll());
            executor.execute(workers[i]);
        }
        else break;
    }
    executor.shutdown();
    while (!executor.isTerminated()) {}

    SortedCharMap.putAll(CharMap);

    this.time =(System.nanoTime() - startTime)*10E-10;
}

class CollectorThread implements Runnable{
    private Document html;
    private String url;

    public CollectorThread(String url){
        this.url = url;
        try {
            this.html = Jsoup.connect(url).get();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        if(cvlinks != links){
            Elements collectedLinks = html.select("a[href]");
            for(Element link:collectedLinks){
                if(cvlinks == links) break;
                else{
                    String current = link.attr("abs:href");
                    if(!current.equals(url) && current.startsWith(baseurl)&& !current.contains("#")){
                        LinksToVisit.add(current);
                        cvlinks++;
                    }
                }
            }
        }

        AnalyzeDocument(html, url);
    }
}

}

1 Ответ

1 голос
/ 06 января 2012

Вместо использования очереди LinksToVisit, просто позвоните executor.execute(new CollectorThread(current)) напрямую из CollectorThread.run(). ExecutorService имеет свою собственную внутреннюю очередь задач, которые он будет выполнять по мере доступности потоков.

Другая проблема заключается в том, что вызов shutdown () после добавления первого набора URL-адресов в очередь предотвратит добавление новых задач исполнителю. Вы можете исправить это, вместо этого выключив исполнителя, когда он опустошил свою очередь:

class Queue extends ThreadPoolExecutor {
    Queue(int nThreads) {
        super(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, 
                new LinkedBlockingQueue<Runnable>());
    }

    protected void afterExecute(Runnable r, Throwable t) {
        if(getQueue().isEmpty()) {
            shutdown();
        }
    }
}
...