Я создал параллельную, рекурсивную программу просмотра каталогов и обработки файлов, которая иногда зависает после завершения всех параллельных вычислений, но «основной» поток никогда не продолжается с другими задачами.
Код в основном является параллельным агрегатором в стиле fork-join, и после завершения параллельной агрегации он должен отображать результаты в окне Swing. Проблема с агрегацией состоит в том, что ей нужно создать дерево и агрегировать статистику конечных узлов в иерархии.
Я уверен, что совершил ошибку параллелизма, но не могу ее найти. Я включил соответствующую часть своего кода в конец поста (комментарии для краткости удалены, извините за 150 строк, при необходимости я могу переместить его во внешнее местоположение).
Контекст: Java 6u13, Windows XP SP3, процессор Core 2 Duo.
Мои вопросы:
Что может быть причиной этого случайного зависания?
Есть ли лучший способ параллельного обхода каталога, возможно, в виде уже существующей библиотеки?
Будет ли инфраструктура fork-join от Doug Lea (или Java 7) лучшей инфраструктурой для агрегации / обхода каталога, и если да, то как мне переосмыслить мою реализацию - на концептуальном уровне?
Спасибо за ваше время.
И выдержка из кода:
private static JavaFileEvaluator[] processFiles(File[] files)
throws InterruptedException {
CountUpDown count = new CountUpDown();
ThreadPoolExecutor ex = (ThreadPoolExecutor)Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
JavaFileEvaluator[] jfes = new JavaFileEvaluator[files.length];
for (int i = 0; i < jfes.length; i++) {
count.increment();
jfes[i] = new JavaFileEvaluator(files[i], count, ex);
ex.execute(jfes[i]);
}
count.await();
for (int i = 0; i < jfes.length; i++) {
count.increment();
final JavaFileEvaluator jfe = jfes[i];
ex.execute(new Runnable() {
public void run() {
jfe.aggregate();
}
});
}
// -------------------------------------
// this await sometimes fails to wake up
count.await(); // <---------------------
// -------------------------------------
ex.shutdown();
ex.awaitTermination(0, TimeUnit.MILLISECONDS);
return jfes;
}
public class JavaFileEvaluator implements Runnable {
private final File srcFile;
private final Counters counters = new Counters();
private final CountUpDown count;
private final ExecutorService service;
private List<JavaFileEvaluator> children;
public JavaFileEvaluator(File srcFile,
CountUpDown count, ExecutorService service) {
this.srcFile = srcFile;
this.count = count;
this.service = service;
}
public void run() {
try {
if (srcFile.isFile()) {
JavaSourceFactory jsf = new JavaSourceFactory();
JavaParser jp = new JavaParser(jsf);
try {
counters.add(Constants.FILE_SIZE, srcFile.length());
countLines();
jp.parse(srcFile);
Iterator<?> it = jsf.getJavaSources();
while (it.hasNext()) {
JavaSource js = (JavaSource)it.next();
js.toString();
processSource(js);
}
// Some catch clauses here
}
} else
if (srcFile.isDirectory()) {
processDirectory(srcFile);
}
} finally {
count.decrement();
}
}
public void processSource(JavaSource js) {
// process source, left out for brevity
}
public void processDirectory(File dir) {
File[] files = dir.listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return
(pathname.isDirectory() && !pathname.getName().startsWith("CVS")
&& !pathname.getName().startsWith("."))
|| (pathname.isFile() && pathname.getName().endsWith(".java")
&& pathname.canRead());
}
});
if (files != null) {
Arrays.sort(files, new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
if (o1.isDirectory() && o2.isFile()) {
return -1;
} else
if (o1.isFile() && o2.isDirectory()) {
return 1;
}
return o1.getName().compareTo(o2.getName());
}
});
for (File f : files) {
if (f.isFile()) {
counters.add(Constants.FILE, 1);
} else {
counters.add(Constants.DIR, 1);
}
JavaFileEvaluator ev = new JavaFileEvaluator(f, count, service);
if (children == null) {
children = new ArrayList<JavaFileEvaluator>();
}
children.add(ev);
count.increment();
service.execute(ev);
}
}
}
public Counters getCounters() {
return counters;
}
public boolean hasChildren() {
return children != null && children.size() > 0;
}
public void aggregate() {
// recursively aggregate non-leaf nodes
if (!hasChildren()) {
count.decrement();
return;
}
for (final JavaFileEvaluator e : children) {
count.increment();
service.execute(new Runnable() {
@Override
public void run() {
e.aggregate();
}
});
}
count.decrement();
}
}
public class CountUpDown {
private final Lock lock = new ReentrantLock();
private final Condition cond = lock.newCondition();
private final AtomicInteger count = new AtomicInteger();
public void increment() {
count.incrementAndGet();
}
public void decrement() {
int value = count.decrementAndGet();
if (value == 0) {
lock.lock();
try {
cond.signalAll();
} finally {
lock.unlock();
}
} else
if (value < 0) {
throw new IllegalStateException("Counter < 0 :" + value);
}
}
public void await() throws InterruptedException {
lock.lock();
try {
if (count.get() > 0) {
cond.await();
}
} finally {
lock.unlock();
}
}
}
Редактировать Добавлен метод hasChildren () в JavaSourceEvaluator.