Многопоточная операция поиска - PullRequest
3 голосов
/ 19 июля 2009

У меня есть метод, который принимает массив запросов, и мне нужно запустить их для различных веб-API поисковых систем, таких как Google или Yahoo. Чтобы распараллелить процесс, для каждого запроса создается поток, который затем join редактируется в конце, поскольку мое приложение может продолжить работу только после У меня есть результаты каждый запрос. В настоящее время у меня есть что-то вроде этого:

public abstract class class Query extends Thread {
    private String query;

    public abstract Result[] querySearchEngine();
    @Override
    public void run() {
        Result[] results = querySearchEngine(query);
        Querier.addResults(results);
    }

}

public class GoogleQuery extends Query {
    public Result querySearchEngine(String query) { 
        // access google rest API
    }
}

public class Querier {
    /* Every class that implements Query fills this array */
    private static ArrayList<Result> aggregatedResults;

    public static void addResults(Result[]) { // add to aggregatedResults }

    public static Result[] queryAll(Query[] queries) {
        /* for each thread, start it, to aggregate results */
        for (Query query : queries) {
            query.start();
        }
        for (Query query : queries) {
            query.join();
        }
        return aggregatedResults;
    }
}

Недавно я обнаружил, что в Java есть новый API для выполнения параллельных заданий. А именно, интерфейс Callable, FutureTask и ExecutorService. Мне было интересно, если этот новый API является тем, который следует использовать, и если они более эффективны, чем традиционные, Runnable и Thread.

Изучив этот новый API, я разработал следующий код (упрощенная версия):

   public abstract class Query implements Callable<Result[]> {
        private final String query; // gets set in the constructor

        public abstract Result[] querySearchEngine();
        @Override
        public Result[] call() {
            return querySearchEngine(query);
        }
    }

public class Querier {   
        private ArrayList<Result> aggregatedResults;

        public Result[] queryAll(Query[] queries) {
            List<Future<Result[]>> futures = new ArrayList<Future<Result[]>>(queries.length);
            final ExecutorService service = Executors.newFixedThreadPool(queries.length);  
            for (Query query : queries) {
                futures.add(service.submit(query));  
            }
            for (Future<Result[]> future : futures) {  
                aggregatedResults.add(future.get());  // get() is somewhat similar to join?
            }  
            return aggregatedResults;
        }
    }

Я новичок в этом API параллелизма, и я хотел бы знать, есть ли что-то, что может быть улучшено в приведенном выше коде, и лучше ли это, чем первый вариант (с использованием Thread ). Есть некоторые классы, которые я не изучал, такие как FutureTask и так далее. Я также хотел бы услышать любой совет по этому поводу.

Ответы [ 3 ]

7 голосов
/ 19 июля 2009

Несколько проблем с вашим кодом.

  1. Вероятно, вы должны использовать метод ExecutorService.invokeAll (). Затраты на создание новых потоков и нового пула потоков могут быть значительными (хотя, возможно, не сравнимы с вызовом внешних поисковых систем). invokeAll () может управлять потоками за вас.
  2. Вы, вероятно, не хотите смешивать массивы и дженерики.
  3. Вы вызываете aggregatedResults.add () вместо addAll ().
  4. Вам не нужно использовать переменные-члены, когда они могут быть локальными для вызова функции queryAll ().

Итак, что-то вроде следующего должно работать:

public abstract class Query implements Callable<List<Result>> {
    private final String query; // gets set in the constructor

    public abstract List<Result> querySearchEngine();
    @Override
    public List<Result> call() {
        return querySearchEngine(query);
    }
}

public class Querier {   
    private static final ExecutorService executor = Executors.newCachedThreadPool();

    public List<Result> queryAll(List<Query> queries) {
        List<Future<List<Result>>> futures = executor.submitAll(queries);
        List<Result> aggregatedResults = new ArrayList<Result>();
        for (Future<List<Result>> future : futures) {  
            aggregatedResults.addAll(future.get());  // get() is somewhat similar to join?
        }  
        return aggregatedResults;
    }
}
4 голосов
/ 19 июля 2009

В качестве дальнейшего улучшения вы можете использовать CompletionService Он отделяет порядок отправки и извлечения, вместо этого помещая все будущие результаты в очередь, из которой вы получаете результаты, в порядке их завершения.

3 голосов
/ 19 июля 2009

Могу ли я предложить вам использовать Future.get () с таймаутом ?

В противном случае потребуется всего одна поисковая система, которая не отвечает на запросы, чтобы остановить все (даже не нужно быть проблемой поисковой системы, если, скажем, у вас проблема с сетью на вашем конце)1005 *

...