Нужно предложение, чтобы избежать тупиков с более высокими шагами итерации - PullRequest
0 голосов
/ 29 марта 2012

Моя программа MPI состоит из ряда процессов, которые отправляют / получают ноль или более сообщений от других процессов. Процессы периодически проверяют, доступны ли сообщения для обработки. Код работает до 3000 итераций. После этого возникают тупики и зависания программы. Пожалуйста, не стесняйтесь бросать любые предложения. Ниже мой псевдокод .. дайте мне знать, если у вас есть какие-либо вопросы.

N - количество узлов обработки:

do{

    if(numberIterations>1)  -- Receive Data
    {
        getdata:
        MPI_Iprobe()
        while(flagprobe !=0)
        {
            If(TAG=StausUpdate)
                Update status of processor;
            If(TAG=Data)
                Process Data;
            MPI_Iprobe()
        }
    }

    if( numberIterations< MaxIterations ) -- Send Data
    {
        for(i=0;i<N;i++)
            MPI_Bsend_init(request[i])

        for(i=0;i<N;i++)
            MPI_Start(request[i])

        numberIterations++;
    }

    if(numberIterations == MaxIterations)  -- Update Processor Status
    {
        for(i=0;i<N;i++)
            MPI_Isend(request1[i]) -- with TAG = StatusUpdate

        goto getdata;
        set endloopflag = 1
    }

    if(numberIterations == MaxIterations && endloopflag ==1) --Final Check
    {
        for(i=0;i<N;i++)
            MPI_Test(request1[i],flagtest);
        if(!flagtest)
            goto getdata;
    }

} while(numberIterations < MaxIterations);

for(i=0;i<N;i++) --Free request
{
    MPI_Request_free(&request[i]);
}

--- Обновлен псевдокод согласно Mark

#include <string.h>
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <iostream>
#include <fstream>
#include "mpi.h"
#define N 9                       //# of nodes
#define M 10                       //samples number
#define n 2                        //demension of weight vector
#define TAU  0.15
#define DISTANCE 0.1              //measuremeant for two nodes
#define A 0.2                     //learning rate
#define ITERATION_STEPS 1000        // Program goes for ITERATION_STEPS - 1 
#define SAMPLE_STEP 1             //Number of current iteration
#define BT1 0.17
#define BT2 0.02
#define A0 0.9                    //initial learning rate
#define AC 0.05                   //middle learning rate
#define AF 0.001                  //final learning rate
#define TC 4500                     //first period of iteration
#define TF 5000                    //second period of iteration

#define BUFSIZE 400000

using namespace std;

void printtime(double comm_time,double update_time,string filename,int rank);
int checkack(int ack[],int status[]);
int checkstatus(int status[],int procid);
int noof_activeproc(int status[],int myrank);
void printresult(double w[][n],string filename,int rank);
void plot(double w[][n], char* fileName);
void update(double w[][n], double x[], int t,int rank, int g[][9]);
double norm(double a[], double b[]);
double p(double sample[], double w[], int t);
void OneToTwo (int index, int *row, int *col);
int g(int b, int j);

int main(int argc, char *argv[])
{
        int rank,size;;
        MPI_Init(&argc,&argv );
        MPI_Comm_size(MPI_COMM_WORLD, &size);
        MPI_Comm_rank(MPI_COMM_WORLD, &rank);
        MPI_Status statusprobe,status,status1[N],status2[N];
        MPI_Request request[N],request1[N],request2[N],request3[N]; //N request for N process  per iteration
        double buf[BUFSIZE]; // buffer for the outgoing message;
        int procstatus[N],ack[N]; //store the process status and ack
        double temp[n];
        double tempsend[n];
        ifstream  in1, in2, in3;
        ofstream out,outtime;
        int i, j,k,z,req = 0,req1;
        int checklocation=0; //bookmark for MPI_Test    
        int checklocation1=0; //bookmark for MPI_Test    
        int numberIterations; 
        double samples[M][n];  //for all samples
        double w[N][n]; //for all node weight
        double x[n]; //one sample
        int g[9][9];
        int count=n;
        int flagprobe=0;
        int flagtest=1;
        int flagrecv=0;
        int datareadflag; // flag that checks wheter the data is read or not
        double dataincount;
        int checktestflag;
        int  requestfreeflag=0;
        int flag=0;  //test flag
        int flagtest1=0; // check for the request to update the processor status
        int endloopflag=0;
        int *bptr, bl;

        double start_time,end_time,tupdate_start,tupdate_end,t_temp1,t_temp2;
        double comm_time;
        double update_time=0;

        for(i=0;i<N;i++)
           {
              procstatus[i]=1;  // all the processor are on
              ack[i]=0;
           } 

        // read sample data
        in1.open("samples.dat");
        if(!in1)
        {
        cout<<"100:File openning error. \n";
        exit(100);
        }

        in2.open("initialMap.dat");
        if(!in2)
        {
        cout<<"200:File openning error. \n";
        exit(200);
        }

        in3.open("gij.dat");
        if(!in3)
        {
        cout<<"200:File openning error. \n";
        exit(200);
        }

        for(i=0; i<M; i++)
           for(j=0; j<n; j++){
              in1>>samples[i][j];
                //cout<<samples[i][j]<<"="<<i<<","<<j<<" "; 
                }
        //read initial weights
        for(i=0; i<N; i++)
          for(j=0; j<n; j++) {
            in2>>w[i][j];
            //cout<<w[i][j]<<"="<<i<<","<<j<<" ";
            }   
        //read Gij
        for(i=0; i<9; i++)
          for(j=0; j<9; j++) {
            in3>>g[i][j];
            //cout<<w[i][j]<<"="<<i<<","<<j<<" ";
            }
        //Print W to file
            out.open("w.dot");
            out<<"graph G {"<<endl;
            out<<"size=\"10,10\";"<<endl;
            out<<"ratio=expand;"<<endl;
            out<<"node [shape=circle];"<<endl;
            //out<<"node [shape=point];"<<endl;
            for(i=0; i<9; i++) {
              for(j=0; j<n; j++) {
                if(j == 0) out<<i+1<<"[pos = \"";
                out<<w[i][j];
                if(j == 0) out<<",";
                if(j == 1) out<<"!\"]"<<endl;
                }
            }

            for(i=0; i<9; i++)
              for(j=0; j<i+1; j++) {
                if(g[i][j] == 1 && i != j) out<<i+1<<" -- "<<j+1<<";"<<endl;
                }
            out<<"}"<<endl;


        MPI_Barrier(MPI_COMM_WORLD);    
        MPI_Buffer_attach( buf, BUFSIZE );
        k = 0;  
        numberIterations = 1;
        dataincount=N; //for the first time , all process or has N data in from file.
        datareadflag=1;
        checktestflag=1;
        int tagno=1;
        int prevtag;  //     start_time=MPI_Wtime();
        time_t start,start1,end1,end;
        time(&start);
        do{
                if(numberIterations%SAMPLE_STEP==0)
                    {
                      t_temp1=MPI_Wtime();
                      if(k>=M) k=0;
                      for(j=0; j<n; j++) {
                        x[j]=samples[k][j];
                    }
                      k++;
                     t_temp2=MPI_Wtime();
                    }           

                if(numberIterations>1)
                 { getdata:
                  MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flagprobe, &statusprobe);  //tag = numberIterations,
                        while(flagprobe != 0)
                        {
                        if(statusprobe.MPI_TAG==0)  // tag=0 means status update of the processor
                            {
                                int rtemp[1];
                                MPI_Recv(rtemp,1,MPI_INT,statusprobe.MPI_SOURCE,0, MPI_COMM_WORLD, &status );             
                                procstatus[status.MPI_SOURCE]=rtemp[0];
                             }  
                            else
                              {
                              datareadflag=1;
                              dataincount++;
                              MPI_Recv(temp,count,MPI_DOUBLE,statusprobe.MPI_SOURCE,statusprobe.MPI_TAG, MPI_COMM_WORLD, &status );             
                              for(j=0;j<n;j++) w[status.MPI_SOURCE][j]=temp[j];
                              }
                        MPI_Iprobe(MPI_ANY_SOURCE,MPI_ANY_TAG, MPI_COMM_WORLD,&flagprobe, &statusprobe); 
                        }  //end while
                     } //end  if (no of iteration >1)
                if( numberIterations< ITERATION_STEPS )   // do not send on last iterations.
                        {
                          tupdate_start=MPI_Wtime();
                          update(w,x,k,rank,g);
                          tupdate_end=MPI_Wtime();
                          update_time=update_time+tupdate_end-tupdate_start;                

                          if(req==0) {
                          for(i=0;i<N;i++)
                             { int c=0;
                              if((i!=rank)&&(checkstatus(procstatus,i)==1))  // send if only the process  is active
                               {
                                 MPI_Bsend_init(w[rank], count, MPI_DOUBLE, i ,tagno, MPI_COMM_WORLD,&request[i]);
                                 MPI_Bsend_init(&c,1, MPI_INT,i,0, MPI_COMM_WORLD,&request1[i]);
                                }
                             } //end for
                            req=1;
                            }

                           for(i=0;i<N;i++)
                             {
                              if((i!=rank)&&(checkstatus(procstatus,i)==1))  // send if only the process  is active
                               {

                                    MPI_Start(&request[i]); 
                                      //actual message send.
                                }
                             } 

                            tagno++;
                            requestfreeflag==1;
                            checktestflag=0;
                            dataincount=0;
                            checklocation=0;
                            numberIterations++;
                            datareadflag=0;

                          cout<<numberIterations<<"-th iterations for . "<<rank<<endl;
                        }  //end  if( numberIterations< ITERATION_STEPS )

                /* Before exiting notify all the active process */
                if((numberIterations == ITERATION_STEPS) && (endloopflag==0))  //endloop flag prevent sending twice
                    {   // status value (initially all 1);
                        req1=0;
                       for(i=0;i<N;i++)
                        { 
                          if((i!=rank)&&(checkstatus(procstatus,i)==1))  // check if only the process  is active
                            { 
                               MPI_Start(&request1[i]); 
                             }
                         }  
                       endloopflag=1; 
                       goto getdata;
                      }  //end if

                if(numberIterations == ITERATION_STEPS && endloopflag==1) 
                 {
                    for(i=1;i<N;i++)
                      { 
                       if((i!=rank)&&(checkstatus(procstatus,i)==1))  // check if only the process  is active
                         {
                           MPI_Test(&request[i], &flagtest, &status);
                           MPI_Test(&request1[i], &flagtest1, &status);
                           if(!flagtest || !flagtest1)
                            {
                              checklocation1=i; //for next check continue from i;
                              cout<<"getdata called by" <<rank<<endl;
                              goto getdata;
                             }            
                          } //end if
                       }//end for 
                  } //end if

         } while(numberIterations < ITERATION_STEPS);

        for(i=0;i<N;i++)
          {
           if(i!=rank && request[i]!=MPI_REQUEST_NULL)
             {
                MPI_Request_free(&request[i]);
                MPI_Request_free(&request1[i]);
             }
          }

        if(numberIterations == ITERATION_STEPS)
           {
            char pno[2];
            sprintf(pno,"%d",rank);
            string filename;
            filename=filename+pno;
            filename=filename+".dot";
            char *file=strdup(filename.c_str());
            ofstream out;
            out.open(file);
              //plot(w, "final_map_25.dat",rank);
            out<<"graph G {"<<endl;
            out<<"size=\"10,10\";"<<endl;
            out<<"ratio=expand;"<<endl;
            out<<"node [shape=point];"<<endl;
            //out<<"node [shape=point];"<<endl;
            for(i=0; i<9; i++) {
              for(j=0; j<n; j++) {
                if(j == 0) out<<i+1<<"[pos = \"";
                out<<w[i][j];
                if(j == 0) out<<",";
                if(j == 1) out<<"!\"]"<<endl;
                }
            }

            for(i=0; i<9; i++)
              for(j=0; j<i+1; j++) {
                if(g[i][j] == 1 && i != j) out<<i+1<<" -- "<<j+1<<";"<<endl;
                }
            out<<"}"<<endl;
           }

        MPI_Buffer_detach( &bptr, &bl );
        MPI_Finalize();
return                                                                                                                                                                    0;
}  // End main Program

1 Ответ

1 голос
/ 29 марта 2012

Мои знания о MPI_Start немного ржавые, но разве это не должно быть в паре с MPI_Wait (или некоторым его вариантом)?Без ожидания я задаюсь вопросом, переполнены ли ваши буферы, что является своего рода объяснением для вашей программы, работающей некоторое время перед остановкой.Поскольку вы не отображаете никаких сообщений об ошибках, я могу свободно интерпретировать ваше утверждение , существуют тупики и программа зависает , чтобы охватить ситуацию, когда программа зависает из-за исчерпания буферного пространства.Точно ли это тупик или нет - вопрос спорный.

...