RecursiveAction / ForkJoinPool с блокирующим вводом / выводом - PullRequest
3 голосов
/ 09 марта 2012

У меня есть Java-клиент, который должен рекурсивно вызывать сервер для извлечения большого графа данных - для этого требуется примерно тысяча вызовов. У меня нет контроля над сервером, и это требуется для критического сценария восстановления после сбоя.

Моя проблема в том, что мне нужно заблокировать исходную ветку до завершения всех вызовов.

Абстракции RecursiveAction и ForkJoinPool из java.util.concurrent - это именно то, что мне нужно, за исключением того, что они предназначены для параллелизма ЦП и запрещают использование блокирующего ввода-вывода.

Итак, как лучше всего реализовать рекурсивные сетевые вызовы с блокировкой инициирующего потока до завершения всех вызовов?

Дополнительная контекстная информация:

  • Я не могу изменить сервер.
  • Сервер разрешает и поддерживает этот тип сложных запросов.
  • Я ограничу количество одновременных сетевых вызовов примерно 10-30.
  • Кэширование данных на диске невозможно.

Дополнительные соображения: будет ли подходящим однофазный фазер в сочетании с ThreadPoolExecutor? Задачи вызова будут вызывать Phaser.register (), выполнять вызов, отправлять дочерние задачи, а затем вызывать Phaser.arrive (). Инициирующий поток будет вызывать Phaser.awaitAdvance (1). Это был бы самый подходящий подход?

Ответы [ 2 ]

4 голосов
/ 10 марта 2012

Получилось хорошо, используя Фазеры JDK1.7.Шаблон, который я использовал, был примерно таким:

private void loadGraphFromServer() {
    final Phaser phaser = new Phaser(1); // "1" registers the calling thread
    for (final Item item : getDataListFromServer()) {
        phaser.register();
        executorService.submit(new Runnable() {
            public void run() {
                try {
                    getMoreDataFromServer(item.getSomeId());
                    // more nested loops/tasks/calls here...
                }
                finally {
                    phaser.arrive();
                }
            }
        });
    }
    phaser.arriveAndAwaitAdvance(); // blocks until all tasks are complete
}
0 голосов
/ 09 марта 2012

Я бы попробовал использовать пул исполнителей фиксированного размера.Вы можете установить максимальный размер от 10 до 30 потоков и распределить его по всем 1000 запросам или добавить 1 запрос, который создаст еще два, и эти два, и т. Д. Вы можете дождаться завершения всех этих запросов с помощью shutdown () и awaitTermination()

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...