Лучше использовать Callable вместо использования Runnable интерфейса и таким образом вы можете извлекать свои данные.
Таким образом, чтобы исправить свой код, вы можете более или менее сделайте что-то вроде этого:
public class WordCounter {
private static ExecutorService threadPool = Executors.newFixedThreadPool(5); // 5 represents the number of concurrent threads.
public Map<String, Integer> count(String filename) {
int chunks = splitFileInChunks(filename);
List<Future<Report>> reports = new ArrayList<Future<Report>>();
for (int i=1; i<=chunks; i++) {
Callable<Report> callable = new ReduceCallable(filename + i + ".txt");
Future<Report> future = threadPool.submit(callable);
reports.add(future);
}
Map<String, Integer> finalMap = new HashMap<>();
for (Future<Report> future : reports) {
Map<String, Integer> map = future.get().getWords();
for (Map.Entry<String, Integer> entry : map.entrySet()) {
int oldCnt = finalMap.get(entry.getKey()) != null ? finalMap.get(entry.getKey()) : 0;
finalMap.put(entry.getKey(), entry.getValue() + oldCnt);
}
}
// return a map with the key being the word and the value the counter for that word
return finalMap;
}
// this method doesn't need to be run on the separate thread
private int splitFileInChunks(String filename) throws IOException { .... }
}
public class Report {
Map<String, Integer> words = new HashMap<>();
// ... getter, setter, constructor etc
}
public class ReduceCounter implements Callable<Report> {
String filename;
public ReduceCounter(String filename) { this.filename = filename;}
public Report call() {
// store the values in a Map<String, Integer> since it's easier that way
Map<String, Integer> myWordsMap = new HashMap<String, Integer>;
// here add the logic from your Reduce method, without the for loop iteration
// you should add logic to read only the file named with the value from "filename"
return new Report(myWordsMap);
}
}
Обратите внимание, что вы можете пропустить класс Report и вернуть Future<Map<String,Integer>>
, но я использовал Report, чтобы упростить отслеживание.
Обновление для Runnable по запросу пользователя
public class WordCounter {
public Map<String, Integer> count(String filename) throws InterruptedException {
int chunks = splitFileInChunks(filename);
List<ReduceCounter> counters = new ArrayList<>();
List<Thread> reducerThreads = new ArrayList<>();
for (int i=1; i<=chunks; i++) {
ReduceCounter rc = new ReduceCounter(filename + i + ".txt");
Thread t = new Thread(rc);
counters.add(rc);
reducerThreads.add(t);
t.start();
}
// next wait for the threads to finish processing
for (Thread t : reducerThreads) {
t.join();
}
// now grab the results from each of them
for (ReduceCounter cnt : counters ) {
cnt.getWords();
// next just merge the results here...
}
}
Класс редуктора должен выглядеть следующим образом:
public class ReduceCounter implements Runnable {
String filename;
Map<String, Integer> words = new HashMap();
public ReduceCounter(String filename) { this.filename = filename;}
public void run() {
// store the values in the "words" map
// here add the logic from your Reduce method, without the for loop iteration
// also read, only the file named with the value from "filename"
}
public Map<String, Integer> getWords() {return words;}
}