Используйте многопоточность для обработки нескольких файлов - PullRequest
0 голосов
/ 10 января 2020

У меня есть файл, который мне нужно использовать для выполнения функции wordcount (на основе MapReduce), но, используя потоки, я беру файл и делю его на несколько маленьких файлов, затем я oop маленьких файлов для подсчета количество вхождений слов с функцией Reduce(), как я могу реализовать потоки с помощью функции run(), чтобы использовать их с функцией Reduce.

вот мой код:

public class WordCounter implements Runnable {

private String Nom;
    protected static int Chunks =  1 ;
    public WordCounter (String n) {
        Nom = n;
    }

   public void split () throws IOException
    {

    File source = new File(this.Nom);
    int maxRows = 100;
    int i = 1;

        try(Scanner sc = new Scanner(source)){
            String line = null;
            int lineNum = 1;

            File splitFile = new File(this.Nom+i+".txt");

            FileWriter myWriter = new FileWriter(splitFile);

            while (sc.hasNextLine()) {
            line = sc.nextLine();

                if(lineNum > maxRows){
                    Chunks++;
                    myWriter.close();
                    lineNum = 1;
                    i++;
                    splitFile = new File(this.Nom+i+".txt");
                    myWriter = new FileWriter(splitFile);
                }

                myWriter.write(line+"\n");
                lineNum++;
            }

            myWriter.close();

        }

}
public void Reduce() throws IOException 
    {

        ArrayList<String> words = new ArrayList<String>();
        ArrayList<Integer> count = new ArrayList<Integer>(); 

            for (int i = 1; i < Chunks; i++) {

            //create the input stream (recevoir le texte)
            FileInputStream fin = new FileInputStream(this.getNom()+i+".txt");
            //go through the text with a scanner
            Scanner sc = new Scanner(fin);

            while (sc.hasNext()) {
                //Get the next word
                String nextString = sc.next();

                //Determine if the string exists in words
                if (words.contains(nextString)) {
                    int index = words.indexOf(nextString);

                    count.set(index, count.get(index)+1);

                }
                else {
                    words.add(nextString);
                    count.add(1);
                }
            }
                sc.close();
                fin.close();
            }

            // Creating a File object that represents the disk file. 
            FileWriter myWriter = new FileWriter(new File(this.getNom()+"Result.txt"));
            for (int i = 0; i < words.size(); i++) {
                myWriter.write(words.get(i)+ " : " +count.get(i) +"\n");    
            }
            myWriter.close();

            //delete the small files
            deleteFiles();
    }
      public void deleteFiles()
    {
        File f= new File("");
        for (int i = 1; i <= Chunks; i++) {
            f = new File(this.getNom()+i+".txt");
            f.delete();
        }
    }

}

Ответы [ 2 ]

0 голосов
/ 11 января 2020

Я вроде нашел решение, когда назначаю поток каждому маленькому файлу, затем вызываю функцию Reduce () внутри функции run (), но я все еще не до конца разбираюсь в этом, вот код :

public void Reduce() throws IOException 
    {

        ArrayList<String> words = new ArrayList<String>();
        ArrayList<Integer> count = new ArrayList<Integer>(); 
        Thread TT= new Thread();
            for (int i = 1; i < Chunks; i++) {

            //create the input stream (recevoir le texte)
            FileInputStream fin = new FileInputStream(this.getNom()+i+".txt");

            TT=new Thread(this.getNom()+i+".txt");
            TT.start();

            //go through the text with a scanner
            Scanner sc = new Scanner(fin);

            while (sc.hasNext()) {
                //Get the next word
                String nextString = sc.next();

                //Determine if the string exists in words
                if (words.contains(nextString)) {
                    int index = words.indexOf(nextString);

                    count.set(index, count.get(index)+1);

                }
                else {
                    words.add(nextString);
                    count.add(1);
                }
            }
                sc.close();
                fin.close();
            }

            // Creating a File object that represents the disk file. 
            FileWriter myWriter = new FileWriter(new File(this.getNom()+"Result.txt"));
            for (int i = 0; i < words.size(); i++) {
                myWriter.write(words.get(i)+ " : " +count.get(i) +"\n");    
            }
            myWriter.close();

            //Store the result in the new file
            deleteFiles();
    }
public void run() {

        try {
            this.Reduce();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
public static void main(String[] args) throws IOException {
        Wordcounter w1 = new Wordcounter("Words.txt");
        Thread T1= new Thread(w1);
        T1.start();
}
0 голосов
/ 10 января 2020

Лучше использовать Callable вместо использования Runnable интерфейса и таким образом вы можете извлекать свои данные.

Таким образом, чтобы исправить свой код, вы можете более или менее сделайте что-то вроде этого:

public class WordCounter {
       private static ExecutorService threadPool = Executors.newFixedThreadPool(5);  // 5 represents the number of concurrent threads.

       public Map<String, Integer> count(String filename) {
          int chunks = splitFileInChunks(filename);
          List<Future<Report>> reports = new ArrayList<Future<Report>>();

          for (int i=1; i<=chunks; i++) {
             Callable<Report> callable = new ReduceCallable(filename + i + ".txt");
             Future<Report> future = threadPool.submit(callable);
             reports.add(future);
          }

          Map<String, Integer> finalMap = new HashMap<>();
          for (Future<Report> future : reports) {
              Map<String, Integer>  map = future.get().getWords();
              for (Map.Entry<String, Integer> entry : map.entrySet()) {
                  int oldCnt = finalMap.get(entry.getKey()) != null ? finalMap.get(entry.getKey()) : 0;
                  finalMap.put(entry.getKey(), entry.getValue() + oldCnt);
              }
          }
          //  return a map with the key being the word and the value the counter for that word
          return finalMap; 
       }

       // this method doesn't need to be run on the separate thread
       private int splitFileInChunks(String filename) throws IOException { .... }
    }

    public class Report {
           Map<String, Integer> words = new HashMap<>();
           // ... getter, setter, constructor etc
    }

    public class ReduceCounter implements Callable<Report> { 
        String filename;
        public ReduceCounter(String filename) { this.filename = filename;}

         public Report call() {
            //  store the values in a Map<String, Integer> since it's easier that way
            Map<String, Integer>  myWordsMap = new HashMap<String, Integer>;
            // here add the logic from your Reduce method, without the for loop iteration
            // you should add logic to read only the file named with the value from "filename" 

            return new Report(myWordsMap);
         }
    }

Обратите внимание, что вы можете пропустить класс Report и вернуть Future<Map<String,Integer>>, но я использовал Report, чтобы упростить отслеживание.

Обновление для Runnable по запросу пользователя

public class WordCounter {
         public Map<String, Integer> count(String filename) throws InterruptedException {
           int chunks = splitFileInChunks(filename);
           List<ReduceCounter> counters = new ArrayList<>(); 
           List<Thread> reducerThreads = new ArrayList<>(); 

          for (int i=1; i<=chunks; i++) {
             ReduceCounter  rc = new ReduceCounter(filename + i + ".txt");
             Thread t = new Thread(rc); 
             counters.add(rc);
             reducerThreads.add(t);
             t.start();
          }
          // next wait for the threads to finish processing
          for (Thread t : reducerThreads) {
                t.join();
          }
          // now grab the results from each of them
          for (ReduceCounter cnt : counters ) {
               cnt.getWords();
               // next just merge the results here...
          }
}

Класс редуктора должен выглядеть следующим образом:

public class ReduceCounter implements Runnable { 
        String filename;
        Map<String, Integer> words = new HashMap();
        public ReduceCounter(String filename) { this.filename = filename;}

         public void run() {
            //  store the values in the "words" map
            // here add the logic from your Reduce method, without the for loop iteration
            // also read, only the file named with the value from "filename" 

         }
        public Map<String, Integer> getWords() {return words;}
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...