Вы должны порождать несколько асинхронных заданий и не должны сразу ждать их завершения:
public int countFiles(String path, String extension) {
ExtensionFilter filter = new ExtensionFilter(extension, true);
File f = new File(path);
return countFilesRecursive(f, filter).join();
}
private CompletableFuture<Integer> countFilesRecursive(File f, FileFilter filter) {
return CompletableFuture.supplyAsync(() -> f.listFiles(filter))
.thenCompose(files -> {
if(files == null) return CompletableFuture.completedFuture(0);
int count = 0;
CompletableFuture<Integer> fileCount = new CompletableFuture<>(), all=fileCount;
for (File file : files) {
if(file.isFile())
count++;
else
all = countFilesRecursive(file, filter).thenCombine(all, Integer::sum);
}
fileCount.complete(count);
return all;
});
}
Обратите внимание, что File.listFiles
может вернуть null
.
Этот код будет подсчитывать все файлы каталога сразу, но запускайте новое асинхронное задание для подкаталогов. Результаты заданий подкаталога объединяются через thenCombine
, чтобы суммировать их результаты. Для упрощения мы создаем еще один CompletableFuture
, fileCount
для представления локально подсчитанных файлов. thenCompose
возвращает будущее, которое будет завершено с результатом будущего, возвращенным указанной функцией, поэтому вызывающая сторона может использовать join()
для ожидания окончательного результата всей операции.
Для I / O операции, это может помочь использовать другой пул потоков, так как по умолчанию ForkJoinPool
настроен на использование ядер ЦП, а не пропускной способности ввода / вывода:
public int countFiles(String path, String extension) {
ExecutorService es = Executors.newFixedThreadPool(30);
ExtensionFilter filter = new ExtensionFilter(extension, true);
File f = new File(path);
int count = countFilesRecursive(f, filter, es).join();
es.shutdown();
return count;
}
private CompletableFuture<Integer> countFilesRecursive(File f,FileFilter filter,Executor e){
return CompletableFuture.supplyAsync(() -> f.listFiles(filter), e)
.thenCompose(files -> {
if(files == null) return CompletableFuture.completedFuture(0);
int count = 0;
CompletableFuture<Integer> fileCount = new CompletableFuture<>(), all=fileCount;
for (File file : files) {
if(file.isFile())
count++;
else
all = countFilesRecursive(file, filter,e).thenCombine(all,Integer::sum);
}
fileCount.complete(count);
return all;
});
}
Нет лучшего числа потоков, это зависит от фактической среды выполнения и будет подвергаться измерению и настройке. Когда предполагается, что приложение запускается в разных средах, это должен быть настраиваемый параметр.
Но учтите, что вы можете использовать не тот инструмент для работы. Альтернативой являются задачи Fork / Join, которые поддерживают взаимодействие с пулом потоков для определения текущей насыщенности, поэтому после того, как все рабочие потоки заняты, он продолжит локальное сканирование с обычной рекурсией, а не отправляет больше асинхронных заданий:
public int countFiles(String path, String extension) {
ExtensionFilter filter = new ExtensionFilter(extension, true);
File f = new File(path);
return POOL.invoke(new FileCountTask(f, filter));
}
private static final int TARGET_SURPLUS = 3, TARGET_PARALLELISM = 30;
private static final ForkJoinPool POOL = new ForkJoinPool(TARGET_PARALLELISM);
static final class FileCountTask extends RecursiveTask<Integer> {
private final File path;
private final FileFilter filter;
public FileCountTask(File file, FileFilter ff) {
this.path = file;
this.filter = ff;
}
@Override
protected Integer compute() {
return scan(path, filter);
}
private static int scan(File directory, FileFilter filter) {
File[] fileList = directory.listFiles(filter);
if(fileList == null || fileList.length == 0) return 0;
List<FileCountTask> recursiveTasks = new ArrayList<>();
int count = 0;
for(File file: fileList) {
if(file.isFile()) count++;
else {
if(getSurplusQueuedTaskCount() < TARGET_SURPLUS) {
FileCountTask task = new FileCountTask(file, filter);
recursiveTasks.add(task);
task.fork();
}
else count += scan(file, filter);
}
}
for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) {
FileCountTask task = recursiveTasks.get(ix);
if(task.tryUnfork()) task.complete(scan(task.path, task.filter));
}
for(FileCountTask task: recursiveTasks) {
count += task.join();
}
return count;
}
}