Map Reduce легко реализуется с помощью некоторых приятных функций параллелизма Java 6, особенно Future, Callable и ExecutorService.
Я создал Callable, который будет анализировать файл так, как вы указали
public class FileAnalyser implements Callable<String> {
private Scanner scanner;
private List<String> termList;
public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException {
this.termList = termList;
scanner = new Scanner(new File(filename));
}
@Override
public String call() throws Exception {
StringBuilder buffer = new StringBuilder();
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
String[] tokens = line.split(" ");
if ((tokens.length >= 3) && (inTermList(tokens[2])))
buffer.append(line);
}
return buffer.toString();
}
private boolean inTermList(String term) {
return termList.contains(term);
}
}
Нам нужно создать новый вызываемый объект для каждого найденного файла и отправить его в службу исполнителя. Результатом представления является будущее, которое мы можем использовать позже, чтобы получить результат анализа файла.
public class Analayser {
private static final int THREAD_COUNT = 10;
public static void main(String[] args) {
//All callables will be submitted to this executor service
//Play around with THREAD_COUNT for optimum performance
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
//Store all futures in this list so we can refer to them easily
List<Future<String>> futureList = new ArrayList<Future<String>>();
//Some random term list, I don't know what you're using.
List<String> termList = new ArrayList<String>();
termList.add("terma");
termList.add("termb");
//For each file you find, create a new FileAnalyser callable and submit
//this to the executor service. Add the future to the list
//so we can check back on the result later
for each filename in all files {
try {
Callable<String> worker = new FileAnalyser(filename, termList);
Future<String> future = executor.submit(worker);
futureList.add(future);
}
catch (FileNotFoundException fnfe) {
//If the file doesn't exist at this point we can probably ignore,
//but I'll leave that for you to decide.
System.err.println("Unable to create future for " + filename);
fnfe.printStackTrace(System.err);
}
}
//You may want to wait at this point, until all threads have finished
//You could maybe loop through each future until allDone() holds true
//for each of them.
//Loop over all finished futures and do something with the result
//from each
for (Future<String> current : futureList) {
String result = current.get();
//Do something with the result from this future
}
}
}
Мой пример здесь далек от завершения и далек от эффективности. Я не учел размер выборки, если он действительно велик, вы можете продолжать циклически просматривать список будущих записей, удаляя законченные элементы, что-то похожее на:
while (futureList.size() > 0) {
for (Future<String> current : futureList) {
if (current.isDone()) {
String result = current.get();
//Do something with result
futureList.remove(current);
break; //We have modified the list during iteration, best break out of for-loop
}
}
}
В качестве альтернативы вы могли бы реализовать настройку типа «производитель-потребитель», в которой производитель отправляет вызовы в службу исполнителя и создает будущее, а потребитель получает результат из будущего и отбрасывает будущее.
Для этого, возможно, потребуются сами продукты и потребители, а также синхронизированный список для добавления / удаления фьючерсов.
Любые вопросы, пожалуйста, задавайте.