Чтение и запись потоков Java из массива общего стека - PullRequest
2 голосов
/ 12 марта 2011

Результат этой программы должен быть одинаковым с 1, 2 или 3 потоками.Тем не менее, результат с потоком 1 является реальным.Я думаю, что я не согласен с общими и частными переменными, что я делаю не так?Потоки должны считывать из стека интервал, а затем вычислять квадратурную модель. Если ошибка достаточно мала (то есть с заданной точностью), то у нас есть решение.Если ошибка все еще слишком велика, интервал делится на два с половиной требуемой ошибки, присваиваемой каждой половине интервала.Квадратура применяется снова и так далее, пока ошибка не станет достаточно маленькой.Основная проблема возникает из-за преждевременного завершения потоков.Стек может быть пустым, но другой поток может поставить новые задачи на него.Решение этой проблемы состоит в том, чтобы вести подсчет «активных» потоков, то есть тех, которые в данный момент обрабатывают интервал.Тогда код должен завершаться только тогда, когда стек пуст и нет активных потоков ...

Пожалуйста, любая помощь будет очень признателен?

Приветствия

 import java.lang.Integer;


class quadtest  {

/* Adaptive Quadrature Code. Finds the value of an integral of a 
   function on a closed interval to a specified accuracy.
*/

  public static void main (String args[]) {

    int nthreads = Integer.parseInt(args[0]);

    double left, right, eps;
    double start_time, time;

    Quad quad =null;
    //Counter counter = new Counter();

    left  = 0.0;
    right = 1.0;
    eps   = 1.0E-11;

    System.out.println("Adaptive Quadrature Program \n");
    System.out.println("eps="+eps+"    n=10000");
    start_time = System.currentTimeMillis();

    //Start threads
    Thread thread_object [] = new Thread[nthreads];

    for(int i=0;i<nthreads;i++){    
        quad = new Quad(left,right,eps,i,nthreads);
        thread_object[i]=new Thread(quad);
    }
    for(int i=0;i<nthreads;i++){    
        thread_object[i].start();
    }
    //Join the threads
    for(int i=0;i<nthreads;i++){
        try{
        thread_object[i].join();
        }catch(InterruptedException x){}
    }

    time = (double) (System.currentTimeMillis()-start_time) / 1000.;
    System.out.println("Result  = "  + quad.getResult() );
    System.out.println("Execution time = "  + time + " seconds ");


    }
}

    import java.lang.Runnable;
import java.util.concurrent.atomic.AtomicInteger;

class Quad implements Runnable{
    //Shared Variables 
    static volatile double [][] stack;
    static volatile boolean first=false;
    static volatile double FinalResult;
    static AtomicInteger threadCounter;
    static AtomicInteger writing;
    static AtomicInteger stackpointer;
    static int nthreads;
    //Constants
    static final int stacksize = 1000;
    static final int il = 0;
    static final int ir = 1;
    static final int ie = 2;
    static final int dims = 3;
    //Private Variables
    private  int tid;
    double left,right,eps;
    private double result;
    private double l,r,ep;


    public Quad(double left, double right, double eps,int tid,int nthreads) {

        this.left = left;
        this.right = right;
        this.eps = eps; 
        this.tid=tid;
        Quad.nthreads = nthreads;
        result = 0.0;
        //Only one thread will do it
        if(first==false){
            first=true;
            stack        =  new double [dims][stacksize];
            threadCounter=  new AtomicInteger(0);
            writing      =  new AtomicInteger(0);
            stackpointer =  new AtomicInteger(1);

            stack[il][stackpointer.get()] = left;
            stack[ir][stackpointer.get()] = right;
            stack[ie][stackpointer.get()] = eps;
            FinalResult=0.0;
         }
    }
    public void run(){
        stackops();
        add();
    }

   public void stackops() {

       double abserror,m, est1, est2;

       while ((stackpointer.get() >= 1)|| threadCounter.get()>0) {


           // Pop next interval off stack.
           synchronized (this){
               threadCounter.incrementAndGet();
               while (writing.get()==1){}
               pop();
           }
           // Compute estimates.
           m    = 0.5 * (l + r);
           est1 = 0.5 * (r - l) * (func(l) + func(r)) ;
           est2 = 0.5 * ((m - l) * (func(l) + func(m)) + (r - m) * 
                 (func(m) + func(r)));
           abserror = Math.abs(est2-est1) / 3.0;


           // Check for desired accuracy: push both halves onto the
           // stack if not accurate enough.        
           if (abserror <= ep) {
               result += est2;
                //System.out.println("ERROR->ID "+tid+"-abserror="+abserror+"-ep="+ep );
                //System.out.flush();
           } else {        
               if (stackpointer.get()+ 2 > stacksize) {
               System.out.println("Stack too small, try stacksize = " 
                          + 2*stacksize);
               }
               //Push into the stack
               synchronized (this){
                    push(m);
                }
             }//end else
             threadCounter.decrementAndGet();
           }//end while
   }//end method

    private synchronized void add(){
        FinalResult +=result;
    }

    private void pop(){
        if(stackpointer.get()>0){
            l   = stack[il][stackpointer.get()];
            r   = stack[ir][stackpointer.get()];
            ep  = stack[ie][stackpointer.get()];
            stackpointer.decrementAndGet();
        }
    }
    private void push (double m){
        writing.set(1);
            if(stackpointer.get()>=-1){
                stackpointer.incrementAndGet();
                stack[il][stackpointer.get()] = l;
                stack[ir][stackpointer.get()] = m;
                stack[ie][stackpointer.get()] = ep * 0.5;

                stackpointer.incrementAndGet();
                stack[il][stackpointer.get()] = m;
                stack[ir][stackpointer.get()] = r;
                stack[ie][stackpointer.get()] = ep * 0.5;
            }
       writing.set(0);
    } 

    public  double getResult(){
        return FinalResult;
    }

    private double func(double x) {

    double q;
    int n;

    n = 10000;
    q = 1000.0;

    for(int i=0;i<n;i++) {
        q -= x;
    }
    if (q == 1.0e10) System.out.println("q = " + q);

    return x * x;

    }
}

Ответы [ 3 ]

2 голосов
/ 12 марта 2011

Ваш код фактически не имеет какого-либо взаимного исключения.

  1. При использовании ключевого слова synchronized вы должны фактически синхронизироваться с объектом, который разделяют все потоки.Однако this в ваших операторах synchronized(this){} относится к неразделенным объектам Quad.
  2. Ваши записи в FinalResult не синхронизированы по той же причине.Кроме того, volatile не является необходимым.
  3. Похоже, вы пытаетесь использовать writing в качестве пользовательского спин-цикла, чтобы предотвратить одновременное появление нескольких потоков.Вам это не нужно - ваши блоки synchronized должны были позаботиться об этом - и вы ошиблись.Представьте, что один поток начинает выполнять pop() и, прежде чем он сможет выполнить первую запись, он будет перенесен.Плюс, у вас тоже есть запись в push, которая не охраняется.Что, если pop() и push() будут вызываться одновременно двумя отдельными потоками?

Другие примечания:

  1. Если пункты 2-3 выше работают должным образом, будетне нужно, чтобы stackPointer был атомарным.
  2. Вы можете инициализировать static данные там, где они определены и для объектов.Т.е.:

    class Quad implements Runnable {
        static AtomicInteger threadCounter = new AtomicInteger(0);
        ...
    }
    
0 голосов
/ 13 марта 2011

Класс Quad реализует Runnable { // Общие переменные статический изменчивый двойной стек [] []; статическое энергозависимое логическое значение first = true; статический объект lock1; статический объект lock2; статический двойной FinalResult; статический AtomicInteger threadCounter; статический указатель стека AtomicInteger; статические нити; // Константы static final int stacksize = 1000; статический финал int il = 0; static final int ir = 1; static final int ie = 2; static final int dims = 3; // Частные переменные Int TID; двойной левый, правый, eps;

private double result;
private double l,r,ep;
private boolean calculate;

public Quad(double left, double right, double eps,int tid,int nthreads) {

this.left  = left;
this.right = right;
this.eps   = eps; 
this.tid   = tid;
Quad.nthreads = nthreads;
result = 0.0;

synchronized(this){
    //Only the first thread will do it
    if(first==true){
    first=false;
    lock1 = new Object();
    lock2 = new Object();
    stack        =  new double [dims][stacksize];
    threadCounter=  new AtomicInteger(0);
    stackpointer =  new AtomicInteger(1);
    stack[il][stackpointer.get()] = left;
    stack[ir][stackpointer.get()] = right;
    stack[ie][stackpointer.get()] = eps;
    FinalResult=0.0;
    System.out.println("I am tid= "+tid );
    }
}
}
public void run(){
    stackops();
    add();
}

public void stackops() {

double abserror,m, est1, est2;
est2=est1=m=abserror=0;

while ((stackpointer.get() >= 1)|| threadCounter.get()>0) {


    // Pop next interval off stack.
    synchronized (lock1){
    pop();
    }
    // Compute estimates.
    if (calculate == true){

    m    = 0.5 * (l + r);
    est1 = 0.5 * (r - l) * (func(l) + func(r)) ;
    est2 = 0.5 * ((m - l) * (func(l) + func(m)) + (r - m) * 
              (func(m) + func(r)));
    abserror = Math.abs(est2-est1) / 3.0;  


    if (abserror <= ep) {
        result += est2;
    } else {           
        //Push into the stack
        synchronized (lock1){
        push(m);
        }  
    }//end else
    threadCounter.decrementAndGet();
    }

   }//end while
System.out.println("I am " + tid+" result = "+result);
}//end method

private void add(){
synchronized(lock1){
    FinalResult +=result;
}
}

private void pop(){
if(stackpointer.get()>0){
    threadCounter.incrementAndGet();
    calculate =true;
    l   = stack[il][stackpointer.get()];
    r   = stack[ir][stackpointer.get()];
    ep  = stack[ie][stackpointer.get()];
    stackpointer.decrementAndGet();
}else{
    calculate =false;
}
}
private void push (double m){

if(stackpointer.get()>=-1){
    stackpointer.incrementAndGet();
    stack[il][stackpointer.get()] = l;
    stack[ir][stackpointer.get()] = m;
    stack[ie][stackpointer.get()] = ep * 0.5;

    stackpointer.incrementAndGet();
    stack[il][stackpointer.get()] = m;
    stack[ir][stackpointer.get()] = r;
    stack[ie][stackpointer.get()] = ep * 0.5;
}
} 

public  double getResult(){
    return FinalResult;
}

private double func(double x) {

double q;
int n;

n = 10000;
q = 1000.0;

for(int i=0;i<n;i++) {
    q -= x;
}
if (q == 1.0e10) System.out.println("q = " + q);

return x * x;

}

}

0 голосов
/ 12 марта 2011

Не прочитав весь ваш код, но только взглянув на него, кажется, что именно для этого и предназначен фреймворк join / fork, который дебютирует в JDK 7. Смотри http://blog.quibb.org/2010/03/jsr-166-the-java-forkjoin-framework/

Вместо того, чтобы возиться с потоками и синхронизировать себя, я бы посоветовал взглянуть на это. Сборки JDK 7 уже могут быть загружены (и, кажется, довольно стабильны). В качестве альтернативы есть отдельная версия объединения / формы, которую можно загрузить отдельно (см. http://g.oswego.edu/dl/concurrency-interest/).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...