Проблема зависания алгоритма параллельного прохождения каталога - PullRequest
1 голос
/ 29 июня 2009

Я создал параллельную, рекурсивную программу просмотра каталогов и обработки файлов, которая иногда зависает после завершения всех параллельных вычислений, но «основной» поток никогда не продолжается с другими задачами.

Код в основном является параллельным агрегатором в стиле fork-join, и после завершения параллельной агрегации он должен отображать результаты в окне Swing. Проблема с агрегацией состоит в том, что ей нужно создать дерево и агрегировать статистику конечных узлов в иерархии.

Я уверен, что совершил ошибку параллелизма, но не могу ее найти. Я включил соответствующую часть своего кода в конец поста (комментарии для краткости удалены, извините за 150 строк, при необходимости я могу переместить его во внешнее местоположение).

Контекст: Java 6u13, Windows XP SP3, процессор Core 2 Duo.

Мои вопросы:

Что может быть причиной этого случайного зависания?

Есть ли лучший способ параллельного обхода каталога, возможно, в виде уже существующей библиотеки?

Будет ли инфраструктура fork-join от Doug Lea (или Java 7) лучшей инфраструктурой для агрегации / обхода каталога, и если да, то как мне переосмыслить мою реализацию - на концептуальном уровне?

Спасибо за ваше время.

И выдержка из кода:

private static JavaFileEvaluator[] processFiles(File[] files) 
throws InterruptedException {
    CountUpDown count = new CountUpDown();
    ThreadPoolExecutor ex = (ThreadPoolExecutor)Executors
    .newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    JavaFileEvaluator[] jfes = new JavaFileEvaluator[files.length];
    for (int i = 0; i < jfes.length; i++) {
        count.increment();
        jfes[i] = new JavaFileEvaluator(files[i], count, ex);
        ex.execute(jfes[i]);
    }
    count.await();
    for (int i = 0; i < jfes.length; i++) {
        count.increment();
        final JavaFileEvaluator jfe = jfes[i];
        ex.execute(new Runnable() {
            public void run() {
                jfe.aggregate();
            }
        });

    }
    // -------------------------------------
    // this await sometimes fails to wake up
    count.await(); // <---------------------
    // -------------------------------------
    ex.shutdown();
    ex.awaitTermination(0, TimeUnit.MILLISECONDS);
    return jfes;
}
public class JavaFileEvaluator implements Runnable {
    private final File srcFile;
    private final Counters counters = new Counters();
    private final CountUpDown count;
    private final ExecutorService service;
    private List<JavaFileEvaluator> children;
    public JavaFileEvaluator(File srcFile, 
            CountUpDown count, ExecutorService service) {
        this.srcFile = srcFile;
        this.count = count;
        this.service = service;
    }
    public void run() {
        try {
            if (srcFile.isFile()) {
                JavaSourceFactory jsf = new JavaSourceFactory();
                JavaParser jp = new JavaParser(jsf);
                try {
                    counters.add(Constants.FILE_SIZE, srcFile.length());
                    countLines();
                    jp.parse(srcFile);
                    Iterator<?> it = jsf.getJavaSources();
                    while (it.hasNext()) {
                        JavaSource js = (JavaSource)it.next();
                        js.toString();
                        processSource(js);
                    }
                // Some catch clauses here
                }
            } else
            if (srcFile.isDirectory()) {
                processDirectory(srcFile);
            }
        } finally {
            count.decrement();
        }
    }
    public void processSource(JavaSource js) {
        // process source, left out for brevity
    }
    public void processDirectory(File dir) {
        File[] files = dir.listFiles(new FileFilter() {
            @Override
            public boolean accept(File pathname) {
                return 
                (pathname.isDirectory() && !pathname.getName().startsWith("CVS") 
                 && !pathname.getName().startsWith("."))
                || (pathname.isFile() && pathname.getName().endsWith(".java") 
                 && pathname.canRead());
            }
        });
        if (files != null) {
            Arrays.sort(files, new Comparator<File>() {
                @Override
                public int compare(File o1, File o2) {
                    if (o1.isDirectory() && o2.isFile()) {
                        return -1;
                    } else
                    if (o1.isFile() && o2.isDirectory()) {
                        return 1;
                    }
                    return o1.getName().compareTo(o2.getName());
                }
            });
            for (File f : files) {
                if (f.isFile()) {
                    counters.add(Constants.FILE, 1);
                } else {
                    counters.add(Constants.DIR, 1);
                }
                JavaFileEvaluator ev = new JavaFileEvaluator(f, count, service);
                if (children == null) {
                    children = new ArrayList<JavaFileEvaluator>();
                }
                children.add(ev);
                count.increment();
                service.execute(ev);
            }
        }
    }
    public Counters getCounters() {
        return counters;
    }
    public boolean hasChildren() {
        return children != null && children.size() > 0;
    }
    public void aggregate() {
        // recursively aggregate non-leaf nodes
        if (!hasChildren()) {
            count.decrement();
            return;
        }
        for (final JavaFileEvaluator e : children) {
            count.increment();
            service.execute(new Runnable() {
                @Override
                public void run() {
                    e.aggregate();
                }
            });
        }
        count.decrement();
    }
}
public class CountUpDown {
    private final Lock lock = new ReentrantLock();
    private final Condition cond = lock.newCondition();
    private final AtomicInteger count = new AtomicInteger();
    public void increment() {
        count.incrementAndGet();
    }
    public void decrement() {
        int value = count.decrementAndGet();
        if (value == 0) {
            lock.lock();
            try {
                cond.signalAll();
            } finally {
                lock.unlock();
            }
        } else
        if (value < 0) {
            throw new IllegalStateException("Counter < 0 :" + value);
        }
    }
    public void await() throws InterruptedException {
        lock.lock();
        try {
            if (count.get() > 0) {
                cond.await();
            }
        } finally {
            lock.unlock();
        }
    }
}

Редактировать Добавлен метод hasChildren () в JavaSourceEvaluator.

1 Ответ

1 голос
/ 29 июня 2009

В агрегатном методе JavaFileEvaluator функция count.decrement () не вызывается в блоке finally. Если какие-либо исключения RuntimeException будут выброшены внутри агрегатной функции (возможно, в методе hasChildren, тело которого я не вижу?), Вызов декремента никогда не произойдет, и CountUpDown будет оставаться в ожидании бесконечно долго. Это может быть причиной случайного зависания, которое вы видите.

Что касается второго вопроса, я не знаю ни одной библиотеки в java для этого, но я действительно не смотрел, извините за отсутствие ответа, но у меня не было возможности использовать до этого.

Что касается третьего вопроса, я думаю, используете ли вы платформу fork-join, предоставленную кем-то другим, или продолжаете предоставлять собственную платформу параллелизма, самым большим преимуществом будет разделение логики, выполняющей работу по обходу каталоги из логики, связанной с управлением параллелизмом. Код, который вы предоставили, использует класс CountUpDown для отслеживания того, когда все потоки завершены, и в итоге вы получите вызовы для увеличения / уменьшения, разбросанные по всем методам, связанным с обходом каталога, что приведет к кошмарам, отслеживающим ошибки. Переход к инфраструктуре fork-join java7 заставит вас создать класс, который имеет дело только с фактической логикой обхода, и оставит логику параллелизма вплоть до фреймворка, что может быть хорошим способом для вас. Другой вариант - продолжать следовать тому, что у вас есть, но провести четкое разграничение между логикой управления и рабочей логикой, которая поможет вам отследить и исправить ошибки такого рода.

...