Моя цель - подсчитать сумму элементов в двоичном дереве, используя ExecutorService
в Java, а затем собрать результат для каждой задачи, используя CompletionService
.
Пользователь задает высоту дерева, уровень, с которого должен начинаться параллелизм, и количество потоков, которые будут использоваться. Я знаю, что ExecutorService
должен порождать ровно столько же потоков, что и пользователь, а служба завершения должна порождать ровно N заданий в методе preProcess
, где N равно 2 ^ (уровень параллелизма), поскольку на определенном уровне, n, у нас будет 2 ^ n узлов.
Моя проблема в том, что я не знаю, как начать обход дерева с заданной высоты и как использовать CompletionService
для сбора результатов в моем методе postProcess
. Кроме того, каждый раз, когда создается новая задача, общее количество задач увеличивается на единицу, и каждый раз, когда CompletionService
возвращает результат, количество задач должно быть уменьшено на единицу.
Я смог использовать CompletionService
в функции processTreeParallel
, но я действительно не понимаю, как я могу использовать его в моем методе postProcess
.
Вот мой код:
import java.util.concurrent.*;
public class TreeCalculation {
// tree level to go parallel
int levelParallel;
// total number of generated tasks
long totalTasks;
// current number of open tasks
long nTasks;
// total height of tree
int height;
// Executors
ExecutorService exec;
CompletionService<Long> cs;
TreeCalculation(int height, int levelParallel) {
this.height = height;
this.levelParallel = levelParallel;
}
void incrementTasks() {
++nTasks;
++totalTasks;
}
void decrementTasks() {
--nTasks;
}
long getNTasks() {
return nTasks;
}
// Where the ExecutorService should be initialized
// with a specific threadCount
void preProcess(int threadCount) {
exec = Executors.newFixedThreadPool(threadCount);
cs = new ExecutorCompletionService<Long>(exec);
nTasks = 0;
totalTasks = 0;
}
// Where the CompletionService should collect the results;
long postProcess() {
long result = 0;
return result;
}
public static void main(String[] args) {
if (args.length != 3) {
System.out.println(
"usage: java Tree treeHeight levelParallel nthreads\n");
return;
}
int height = Integer.parseInt(args[0]);
int levelParallel = Integer.parseInt(args[1]);
int threadCount = Integer.parseInt(args[2]);
TreeCalculation tc = new TreeCalculation(height, levelParallel);
// generate balanced binary tree
Tree t = Tree.genTree(height, height);
//System.gc();
// traverse sequential
long t0 = System.nanoTime();
long p1 = t.processTree();
double t1 = (System.nanoTime() - t0) * 1e-9;
t0 = System.nanoTime();
tc.preProcess(threadCount);
long p2 = t.processTreeParallel(tc);
p2 += tc.postProcess();
double t2 = (System.nanoTime() - t0) * 1e-9;
long ref = (Tree.counter * (Tree.counter + 1)) / 2;
if (p1 != ref)
System.out.printf("ERROR: sum %d != reference %d\n", p1, ref);
if (p1 != p2)
System.out.printf("ERROR: sum %d != parallel %d\n", p1, p2);
if (tc.totalTasks != (2 << levelParallel)) {
System.out.printf("ERROR: ntasks %d != %d\n",
2 << levelParallel, tc.totalTasks);
}
// print timing
System.out.printf("tree height: %2d "
+ "sequential: %.6f "
+ "parallel with %3d threads and %6d tasks: %.6f "
+ "speedup: %.3f count: %d\n",
height, t1, threadCount, tc.totalTasks, t2, t1 / t2, ref);
}
}
// ============================================================================
class Tree {
static long counter; // counter for consecutive node numbering
int level; // node level
long value; // node value
Tree left; // left child
Tree right; // right child
// constructor
Tree(long value) {
this.value = value;
}
// generate a balanced binary tree of depth k
static Tree genTree(int k, int height) {
if (k < 0) {
return null;
} else {
Tree t = new Tree(++counter);
t.level = height - k;
t.left = genTree(k - 1, height);
t.right = genTree(k - 1, height);
return t;
}
}
// ========================================================================
// traverse a tree sequentially
long processTree() {
return value
+ ((left == null) ? 0 : left.processTree())
+ ((right == null) ? 0 : right.processTree());
}
// ========================================================================
// traverse a tree parallel
// This is where I was able to use the CompletionService
long processTreeParallel(TreeCalculation tc) {
tc.totalTasks = 0;
for(long i =0; i<(long)Math.pow(tc.levelParallel, 2); i++)
{
tc.incrementTasks();
tc.cs.submit(new Callable<Long>(){
@Override
public Long call() throws Exception {
return processTree();
}
});
}
Long result = Long.valueOf(0);
for(int i=0; i<(long)Math.pow(2,tc.levelParallel); i++) {
try{
result += tc.cs.take().get();
tc.decrementTasks();
}catch(Exception e){}
}
return result;
}
}