Очередь задач Java, пул потоков и потоки с обратным вызовом для информирования о начале новой задачи - PullRequest
0 голосов
/ 24 февраля 2012

Я попытался найти способ загрузки нескольких веб-страниц в нескольких потоках с определенным максимальным пределом потоков таким образом, чтобы по окончании загрузки была загружена новая страница. Должны также быть другие потоки постобработки для загруженного контента после загрузки страницы, чтобы весь процесс был связан.

Как бы я хотел это сделать:

  • Очередь задач содержит страницы, которые должны быть загружены
  • Threadpool имеет определенное количество потоков, загружающих страницы в очереди задач (загрузка страниц занимает некоторое время, поэтому количество потоков может быть намного больше, чем количество ядер процессора)
  • Когда загрузка страницы завершена, поток должен уведомить об этом, чтобы вместо нее можно было запустить новую задачу из очереди
  • Когда загрузка страницы завершена, содержимое следует перенести в другую очередь задач для последующей обработки

  • В другом пуле потоков столько потоков, сколько в ядре у процессора (предположим, что для постобработки быстрее всего иметь только один поток на ядро), этот пул потоков выполняет постобработку загруженных страниц.

  • Когда постобработка страницы завершена, поток должен уведомить об этом, чтобы еще одна страница в очереди могла быть постобработана

  • Когда все страницы были загружены (очередь пуста), первый пул потоков может быть отключен, другой пул потоков может быть отключен, когда обе очереди задач пусты (все страницы были загружены и постобработаны)

У меня есть что-то вроде:

            for (int j = 0; j < threads.length; j++) {
            threads[j].start();
        }

        for (int j = 0; j < threads.length; j++) {
            threads[j].join();
        }

Но таким образом все страницы для загрузки находятся в отдельных потоках одновременно, и я хочу ограничить количество потоков. Что еще более важно, я хочу повторно использовать потоки и заставить поток выполнять следующую задачу, когда одна задача завершена. Я мог бы сделать это с помощью цикла while, но это то, чего я пытаюсь избежать, я не хочу, чтобы цикл while проверял, есть ли в очереди больше задач и свободен ли поток. Можно ли использовать какой-то обратный вызов, чтобы поток возвращал пул, который завершен, и возвращает данные. Я также хочу, чтобы задачи загрузки сохраняли содержимое в структуре данных и добавляли его в очередь задач последующей обработки.

Лучшие ресурсы, которые я нашел, это: Резьба пулов Обратный вызов

Но я не знаю, возможно ли вообще создать его так, как я хочу. Я застрял в размышлениях о функциональных указателях.

Ответы [ 3 ]

3 голосов
/ 24 февраля 2012

Не используйте методы потоков низкого уровня, чтобы сделать это. Иметь пул потоков downloadExecutor и отправлять экземпляры DownloadTask (реализующие Runnable или Callable) в этот пул.

В конце кода DownloadTask отправьте экземпляр PostProcessPageTask (снова реализуя реализацию Runnable или Callable) во второй postProcessExecutor пул потоков.

Вы можете использовать один или два экземпляра CountDownLatch, которые по завершении каждой задачи будут уменьшаться, и иметь основной поток, ожидающий на этой (или этих) защелках, чтобы знать, когда пулы потоков должны быть закрыты.

См. http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/Executors.html и docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html для получения дополнительной информации.

1 голос
/ 24 февраля 2012

Вы можете использовать Guava's ListenableFutures .

Сначала вам нужно отправить задачи загрузки в ListenableExecutorService , а затем преобразовать полученные фьючерсы с постпроцессором через Futures.transform .

ListenableExecutorService dlPool = MoreExecutors.listeningDecorator(firstPool);
ListenableExecutorService procPool = MoreExecutors.listeningDecorator(secondPool);

List<ListenableFuture<Result>> results = new ArrayList<...>();
for (String url : urls) {
  // download task
  ListenableFuture<String> html = dlPool.submit(...);
  // post process
  ListenableFuture<Result> result = Futures.transform(html,
    new Function<String, Result>() {
      ... // post process
    }, procPool);
  results.add(result);
}

// blocks until all results are processed
List<Result> processed = Futures.allAsList(results).get();

firstPool.shutdownNow();
secondPool.shutdownNow();
0 голосов
/ 24 февраля 2012

Не пытайтесь кодировать этот тип общей инфраструктуры вручную.

Java 5 и выше поставляется с прекрасным пакетом java.util.concurrent

Это должно быть первое, к чему вы обращаетесь при создании многопоточных приложений.

Он имеет множество общих инструментов, таких как threadpools (которые выполняют объекты Runnable или Callable), и сделает большую работу за вас.

В Интернете есть множество отличных бесплатных ресурсов, или, если вы предпочитаете книги, «Параллелизм Java на практике» Брайана Гетца считается одним из лучших.

...