Java многопоточный с CompletableFuture работает медленнее - PullRequest
0 голосов
/ 28 января 2020

Я пытался написать код для подсчета файлов определенного типа на моем компьютере. Я протестировал как однопоточное, так и многопоточное асинхронное решение, и кажется, что один поток работает быстрее. Что-то не так с моим кодом? и если нет, то почему он не работает быстрее?

Код ниже: AsynchFileCounter - Асинхронизированная версия. ExtensionFilter - Фильтр файлов для отображения только каталогов и файлов с указанным расширением. BasicFileCounter - Версия с одним потоком.

public class AsynchFileCounter {
    public int countFiles(String path, String extension) throws InterruptedException, ExecutionException {
        ExtensionFilter filter = new ExtensionFilter(extension, true);
        File f = new File(path);
        return countFilesRecursive(f, filter);
    }

    private int countFilesRecursive(File f, ExtensionFilter filter) throws InterruptedException, ExecutionException {
        return CompletableFuture.supplyAsync(() -> f.listFiles(filter))
            .thenApplyAsync(files -> {
                int count = 0;
                for (File file : files) {
                    if(file.isFile())
                        count++;
                    else
                        try {
                            count += countFilesRecursive(file, filter);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                }
                return count;
            }).get();
    }

}

public class ExtensionFilter implements FileFilter {
    private String extension;
    private boolean allowDirectories;

    public ExtensionFilter(String extension, boolean allowDirectories) {
        if(extension.startsWith("."))
            extension = extension.substring(1);
        this.extension = extension;
        this.allowDirectories = allowDirectories;
    }

    @Override
    public boolean accept(File pathname) {
        if(pathname.isFile() && pathname.getName().endsWith("." + extension))
            return true;
        if(allowDirectories) {
            if(pathname.isDirectory())
                return true;
        }
        return false;
    }
}

public class BasicFileCounter {
    public int countFiles(String path, String extension) {
        ExtensionFilter filter = new ExtensionFilter(extension, true);
        File f = new File(path);
        return countFilesRecursive(f, filter);
    }

    private int countFilesRecursive(File f, ExtensionFilter filter) {
        int count = 0;
        File [] ar = f.listFiles(filter);
        for (File file : ar) {
            if(file.isFile())
                count++;
            else
                count += countFilesRecursive(file, filter);
        }
        return count;
    }
}

Ответы [ 3 ]

1 голос
/ 29 января 2020

Вы должны порождать несколько асинхронных заданий и не должны сразу ждать их завершения:

public int countFiles(String path, String extension) {
    ExtensionFilter filter = new ExtensionFilter(extension, true);
    File f = new File(path);
    return countFilesRecursive(f, filter).join();
}
private CompletableFuture<Integer> countFilesRecursive(File f, FileFilter filter) {
    return CompletableFuture.supplyAsync(() -> f.listFiles(filter))
        .thenCompose(files -> {
            if(files == null) return CompletableFuture.completedFuture(0);
            int count = 0;
            CompletableFuture<Integer> fileCount = new CompletableFuture<>(), all=fileCount;
            for (File file : files) {
                if(file.isFile())
                    count++;
                else
                    all = countFilesRecursive(file, filter).thenCombine(all, Integer::sum);
            }
            fileCount.complete(count);
            return all;
        });
}

Обратите внимание, что File.listFiles может вернуть null.

Этот код будет подсчитывать все файлы каталога сразу, но запускайте новое асинхронное задание для подкаталогов. Результаты заданий подкаталога объединяются через thenCombine, чтобы суммировать их результаты. Для упрощения мы создаем еще один CompletableFuture, fileCount для представления локально подсчитанных файлов. thenCompose возвращает будущее, которое будет завершено с результатом будущего, возвращенным указанной функцией, поэтому вызывающая сторона может использовать join() для ожидания окончательного результата всей операции.

Для I / O операции, это может помочь использовать другой пул потоков, так как по умолчанию ForkJoinPool настроен на использование ядер ЦП, а не пропускной способности ввода / вывода:

public int countFiles(String path, String extension) {
    ExecutorService es = Executors.newFixedThreadPool(30);
    ExtensionFilter filter = new ExtensionFilter(extension, true);
    File f = new File(path);
    int count = countFilesRecursive(f, filter, es).join();
    es.shutdown();
    return count;
}
private CompletableFuture<Integer> countFilesRecursive(File f,FileFilter filter,Executor e){
    return CompletableFuture.supplyAsync(() -> f.listFiles(filter), e)
        .thenCompose(files -> {
            if(files == null) return CompletableFuture.completedFuture(0);
            int count = 0;
            CompletableFuture<Integer> fileCount = new CompletableFuture<>(), all=fileCount;
            for (File file : files) {
                if(file.isFile())
                    count++;
                else
                    all = countFilesRecursive(file, filter,e).thenCombine(all,Integer::sum);
            }
            fileCount.complete(count);
            return all;
        });
}

Нет лучшего числа потоков, это зависит от фактической среды выполнения и будет подвергаться измерению и настройке. Когда предполагается, что приложение запускается в разных средах, это должен быть настраиваемый параметр.


Но учтите, что вы можете использовать не тот инструмент для работы. Альтернативой являются задачи Fork / Join, которые поддерживают взаимодействие с пулом потоков для определения текущей насыщенности, поэтому после того, как все рабочие потоки заняты, он продолжит локальное сканирование с обычной рекурсией, а не отправляет больше асинхронных заданий:

public int countFiles(String path, String extension) {
    ExtensionFilter filter = new ExtensionFilter(extension, true);
    File f = new File(path);
    return POOL.invoke(new FileCountTask(f, filter));
}

private static final int TARGET_SURPLUS = 3,  TARGET_PARALLELISM = 30;

private static final ForkJoinPool POOL = new ForkJoinPool(TARGET_PARALLELISM);

static final class FileCountTask extends RecursiveTask<Integer> {
    private final File path;
    private final FileFilter filter;
    public FileCountTask(File file, FileFilter ff) {
        this.path = file;
        this.filter = ff;
    }

    @Override
    protected Integer compute() {
        return scan(path, filter);
    }

    private static int scan(File directory, FileFilter filter) {
        File[] fileList = directory.listFiles(filter);
        if(fileList == null || fileList.length == 0) return 0;
        List<FileCountTask> recursiveTasks = new ArrayList<>();
        int count = 0;
        for(File file: fileList) {
            if(file.isFile()) count++;
            else {
                if(getSurplusQueuedTaskCount() < TARGET_SURPLUS) {
                    FileCountTask task = new FileCountTask(file, filter);
                    recursiveTasks.add(task);
                    task.fork();
                }
                else count += scan(file, filter);
            }
        }

        for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) {
            FileCountTask task = recursiveTasks.get(ix);
            if(task.tryUnfork()) task.complete(scan(task.path, task.filter));
        }

        for(FileCountTask task: recursiveTasks) {
            count += task.join();
        }
        return count;
    }
}
0 голосов
/ 30 января 2020

Я внес некоторые изменения в код:

  1. Я использую AtomicInteger для подсчета файлов вместо LongAdder.
  2. После прочтения ответа Хольгера я решил подсчитать обрабатываемые каталоги , Когда число падает до нуля, работа сделана. Поэтому я добавил блокировку и условие, чтобы основной поток знал, когда работа завершена.
  3. Я добавил проверку, возвращает ли file.listFiles () значение NULL. Я запустил код на windows, но этого никогда не было (у меня был пустой каталог, и он возвратил пустой массив), но, поскольку он использует собственный код, он может вернуть ноль в других ОС.
public class AsynchFileCounter {
    private AtomicInteger count;
    private AtomicInteger countDirectories;
    private ReentrantLock lock;
    private Condition noMoreDirectories;

    public int countFiles(String path, String extension) {
        count = new AtomicInteger();
        countDirectories = new AtomicInteger();
        lock = new ReentrantLock();
        noMoreDirectories = lock.newCondition();
        ExtensionFilter filter = new ExtensionFilter(extension, true);
        File f = new File(path);
        countFilesRecursive(f, filter);
        lock.lock();
        try {
            noMoreDirectories.await();
        } catch (InterruptedException e) {}
        finally {
            lock.unlock();
        }
        return count.intValue();
    }

    private void countFilesRecursive(File f, ExtensionFilter filter) {
        countDirectories.getAndIncrement();
        CompletableFuture.supplyAsync(() -> f.listFiles(filter))
            .thenAcceptAsync(files -> countFiles(filter, files));
    }

    private void countFiles(ExtensionFilter filter, File[] files) {
        if(files != null) {
            for (File file : files) {
                if(file.isFile())
                    count.incrementAndGet();
                else 
                    countFilesRecursive(file, filter);
            }
        }
        int currentCount = countDirectories.decrementAndGet();
        if(currentCount == 0) {
            lock.lock();
            try {
                noMoreDirectories.signal();
            }
            finally {
                lock.unlock();
            }
        }
    }
}

0 голосов
/ 29 января 2020

Я понял это. так как я складываю результаты в этой строке:


count + = countFilesRecursive (file, filter);


и использую get () для получения результата, я На самом деле я жду результата, а не распараллеливаю код.

Это мой текущий код, который на самом деле работает намного быстрее, чем однопоточный код. Тем не менее, я не мог найти элегантный способ узнать, когда параллельный метод сделан.

Мне бы очень хотелось услышать, как мне это решить?

Вот ужасный способ, которым я пользуюсь:

public class AsynchFileCounter {
    private LongAdder count; 

    public int countFiles(String path, String extension) {
        count = new LongAdder();
        ExtensionFilter filter = new ExtensionFilter(extension, true);
        File f = new File(path);
        countFilesRecursive(f, filter);
        // ******** The way I check whether The function is done **************** //
        int prev = 0;
        int cur = 0;
        do {
            prev = cur;
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {}
            cur = (int)count.sum();
        } while(cur>prev);
        // ******************************************************************** //
        return count.intValue();
    }

    private void countFilesRecursive(File f, ExtensionFilter filter) {
        CompletableFuture.supplyAsync(() -> f.listFiles(filter))
            .thenAcceptAsync(files -> {
                for (File file : files) {
                    if(file.isFile())
                        count.increment();
                    else
                        countFilesRecursive(file, filter);
                }
            });
    }
}

...