Java вопрос о многопоточном присоединении - PullRequest
0 голосов
/ 14 января 2020

Так что мне нужно обработать пару файлов данных, используя потоки (уже разделенные), и у меня возникают проблемы с тем, как остановить основной поток, пока все подпотоки не закончатся sh. я оглянулся и попытался использовать join (), но это вызывает проблему:

  • Если я присоединяюсь к основному потоку с последним потоком, то, поскольку другие потоки выполняются одновременно, последний поток не всегда последний завершает sh
  • Если я присоединяюсь к основному потоку со всеми другими потоками, то они не запускаются одновременно, второму нужен первый, чтобы завершить sh первый. также пробовал wait () и notify (), но было еще больше проблем. вот часть моего кода

        public class Matrix extends MapReduce {
        ArrayList<String> VecteurLines = new ArrayList<String>();
        protected int[] nbrLnCol = {0,0};
        protected static double[] res;

        public Matrix(String n) {
            super(n);
        }
        public Matrix(String n,String m){
            super(n,m);
        }
    public void Reduce() throws IOException, InterruptedException, MatrixException {

            for (int i = 1; i <= Chunks; i++) {

                Thread t=new Thread(new RunThread(VecteurLines,i,this));
                t.start();

            }
        }

А вот класс, который обрабатывает потоки


    public class RunThread extends Matrix implements Runnable {
            Matrix ma;
            ArrayList<String> vec;
            int threadNbr;


            public RunThread(ArrayList<String> vec, int threadNbr,Matrix ma)  {
                super("","");
                this.vec=vec;this.threadNbr=threadNbr;this.ma=ma; }

            @Override
            public void run() {

                FileInputStream fin = null;
                try {
                    fin = new FileInputStream(ma.getNom()+threadNbr+".txt");
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                }
                Scanner sc = new Scanner(fin);


                while (sc.hasNext()) {
                    String nextString = sc.next();

                    ma.nbrLnCol[0]++;
                    String [] arr = nextString.split(",");
                    ma.nbrLnCol[1]=arr.length;
                    double c=0;
                    for(int j=0;j<arr.length;j++)
                    {
                        c+=(Double.parseDouble(arr[j])*Double.parseDouble(vec.get(j)));

                    }

                    res[threadNbr-1]=c;
                }
                sc.close();
                try {
                    fin.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }

                File file = new File(ma.getNom()+threadNbr+".txt");
                file.delete();
            }

Ответы [ 2 ]

0 голосов
/ 26 января 2020

Я считаю, что вам нужно две точки в вашем коде: ваш основной поток должен заканчиваться последним после того, как весь поток будет выполнен, потому что вы сказали

"как остановить основной поток, пока все подпотоки не будут завершены sh "

. Во-вторых, поток должен заканчиваться sh один за другим, то есть 2-й поток должен заканчиваться sh после 1-го потока, как вы сказали

"для второго необходим первый, а для конца sh первый. "

Вот мой код, чтобы сделать это с помощью соединения.

public class Matrix extends MapReduce {
    ArrayList<String> VecteurLines = new ArrayList<String>();
    protected int[] nbrLnCol = {0,0};
    protected static double[] res;

    public Matrix(String n) {
        super(n);
    }
    public Matrix(String n,String m){
        super(n,m);
    }
public void Reduce() throws IOException, InterruptedException, MatrixException {
    Thread t = null;
        for (int i = 1; i <= Chunks; i++) {

            Thread t=new Thread(new RunThread(t,VecteurLines,i,this));
            t.start();

        }
      t.join(); // finally main thread joining with the last thread.
    }

и

public class RunThread extends Matrix implements Runnable {
        Matrix ma;
        ArrayList<String> vec;
        int threadNbr;
        Thread t;


        public RunThread(t,ArrayList<String> vec, int threadNbr,Matrix ma)  {
            this.t = t;
            super("","");
            this.vec=vec;this.threadNbr=threadNbr;this.ma=ma; }

        @Override
        public void run() {                
            FileInputStream fin = null;
            try {
                fin = new FileInputStream(ma.getNom()+threadNbr+".txt");
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }
            Scanner sc = new Scanner(fin);


            while (sc.hasNext()) {
                String nextString = sc.next();

                ma.nbrLnCol[0]++;
                String [] arr = nextString.split(",");
                ma.nbrLnCol[1]=arr.length;
                double c=0;
                for(int j=0;j<arr.length;j++)
                {
                    c+=(Double.parseDouble(arr[j])*Double.parseDouble(vec.get(j)));

                }

                res[threadNbr-1]=c;
            }
            sc.close();
            try {
                fin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

            File file = new File(ma.getNom()+threadNbr+".txt");
            file.delete();
            if(t!=null){
             t.join(); //join with the previous thread eg. thread2 joining with thread1
            }
        }
0 голосов
/ 14 января 2020

Попробуйте так:

 private List<Thread> threadList = new ArrayList<>();

 public void Reduce() {
     threadList.clear();
     for (int i = 1; i <= Chunks; i++) {
         Thread t  =new Thread(new RunThread(VecteurLines,i,this));
         threadList.add(t);
     }

     // start all worker threads
     for(int i=0; i<threadList.size(); i++){
         threadList.get(i).start();
     }

     // wait until all worker threads is finished
     while (true) {
         int threadIsNotLive = 0;
         for (int i = 0; i < threadList.size(); i++) {
             Thread t = threadList.get(i);
             if (!t.isAlive() || t == null) {
                 ++threadIsNotLive;
             }
         }
         if(threadIsNotLive>0 && (threadList.size() == threadIsNotLive)){
             break;
             // all worker threads is finished
         }
         else {
             Thread.sleep(50);
             // wait until all worker threads is finished
         }
     }
 }

ИЛИ

 public void Reduce() {
     List<Thread> threadList = new ArrayList<>();
     for (int i = 1; i <= Chunks; i++) {
         Thread t  =new Thread(new RunThread(VecteurLines,i,this));
         threadList.add(t);
     }

     // start all worker threads
     for(int i=0; i<threadList.size(); i++){
         threadList.get(i).start();
         threadList.get(i).join();
     }
 }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...