Нужна помощь в реализации этого алгоритма с картой Hadoop MapReduce - PullRequest
6 голосов
/ 07 июня 2010

У меня есть алгоритм, который будет проходить через большой набор данных, читать некоторые текстовые файлы и искать конкретные термины в этих строках. Я реализовал его в Java, но я не хотел публиковать код, чтобы он не выглядел, я ищу кого-то, кто может реализовать его для меня, но это правда, мне действительно нужна большая помощь !!! Это не было запланировано для моего проекта, но набор данных оказался огромным, поэтому учитель сказал мне, что я должен сделать это так.

РЕДАКТИРОВАТЬ (я не уточнил в предыдущей версии) У меня есть набор данных на кластере Hadoop, и я должен сделать его реализацию MapReduce

Я читал о MapReduce и думал, что сначала я делаю стандартную реализацию, а потом будет более / менее легче сделать это с помощью mapreduce. Но этого не произошло, так как алгоритм довольно глупый и ничего особенного, а карта уменьшена ... я не могу обернуться вокруг этого.

Итак, вот кратко псевдокод моего алгоритма

LIST termList   (there is method that creates this list from lucene index)
FOLDER topFolder

INPUT topFolder
IF it is folder and not empty
    list files (there are 30 sub folders inside)
    FOR EACH sub folder
        GET file "CheckedFile.txt"
        analyze(CheckedFile)
    ENDFOR
END IF


Method ANALYZE(CheckedFile)

read CheckedFile
WHILE CheckedFile has next line
    GET line
    FOR(loops through termList)
            GET third word from line
          IF third word = term from list
        append whole line to string buffer
    ENDIF
ENDFOR
END WHILE
OUTPUT string buffer to file

Кроме того, как вы можете видеть, каждый раз, когда вызывается «анализ», должен быть создан новый файл, я понял, что сокращение карты сложно записать во многие выходные данные ???

Я понимаю интуицию mapreduce, и мой пример кажется идеально подходящим для mapreduce, но когда дело доходит до этого, очевидно, я не знаю достаточно, и я STUCK!

Пожалуйста, помогите.

Ответы [ 2 ]

3 голосов
/ 07 июня 2010

Вы можете просто использовать пустой редуктор и разделить вашу работу, чтобы запустить один маппер на файл. Каждый маппер создаст свой собственный выходной файл в вашей выходной папке.

2 голосов
/ 07 июня 2010

Map Reduce легко реализуется с помощью некоторых приятных функций параллелизма Java 6, особенно Future, Callable и ExecutorService.

Я создал Callable, который будет анализировать файл так, как вы указали

public class FileAnalyser implements Callable<String> {

  private Scanner scanner;
  private List<String> termList;

  public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException {
    this.termList = termList;
    scanner = new Scanner(new File(filename));
  }

  @Override
  public String call() throws Exception {
    StringBuilder buffer = new StringBuilder();
    while (scanner.hasNextLine()) {
      String line = scanner.nextLine();
      String[] tokens = line.split(" ");
      if ((tokens.length >= 3) && (inTermList(tokens[2])))
        buffer.append(line);
    }
    return buffer.toString();
  }

  private boolean inTermList(String term) {
    return termList.contains(term);
  }
}

Нам нужно создать новый вызываемый объект для каждого найденного файла и отправить его в службу исполнителя. Результатом представления является будущее, которое мы можем использовать позже, чтобы получить результат анализа файла.

public class Analayser {

  private static final int THREAD_COUNT = 10;

  public static void main(String[] args) {

    //All callables will be submitted to this executor service
    //Play around with THREAD_COUNT for optimum performance
    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);

    //Store all futures in this list so we can refer to them easily
    List<Future<String>> futureList = new ArrayList<Future<String>>();

    //Some random term list, I don't know what you're using.
    List<String> termList = new ArrayList<String>();
    termList.add("terma");
    termList.add("termb");

    //For each file you find, create a new FileAnalyser callable and submit
    //this to the executor service. Add the future to the list
    //so we can check back on the result later
    for each filename in all files {
      try {
        Callable<String> worker = new FileAnalyser(filename, termList);
        Future<String> future = executor.submit(worker);
        futureList.add(future);
      }
      catch (FileNotFoundException fnfe) {
        //If the file doesn't exist at this point we can probably ignore,
        //but I'll leave that for you to decide.
        System.err.println("Unable to create future for " + filename);
        fnfe.printStackTrace(System.err);
      }
    }

    //You may want to wait at this point, until all threads have finished
    //You could maybe loop through each future until allDone() holds true
    //for each of them.

    //Loop over all finished futures and do something with the result
    //from each
    for (Future<String> current : futureList) {
      String result = current.get();
      //Do something with the result from this future
    }
  }
}

Мой пример здесь далек от завершения и далек от эффективности. Я не учел размер выборки, если он действительно велик, вы можете продолжать циклически просматривать список будущих записей, удаляя законченные элементы, что-то похожее на:

while (futureList.size() > 0) {
      for (Future<String> current : futureList) {
        if (current.isDone()) {
          String result = current.get();
          //Do something with result
          futureList.remove(current);
          break; //We have modified the list during iteration, best break out of for-loop
        }
      }
}

В качестве альтернативы вы могли бы реализовать настройку типа «производитель-потребитель», в которой производитель отправляет вызовы в службу исполнителя и создает будущее, а потребитель получает результат из будущего и отбрасывает будущее.

Для этого, возможно, потребуются сами продукты и потребители, а также синхронизированный список для добавления / удаления фьючерсов.

Любые вопросы, пожалуйста, задавайте.

...