Моя программа MPI состоит из ряда процессов, которые отправляют / получают ноль или более сообщений от других процессов. Процессы периодически проверяют, доступны ли сообщения для обработки. Код работает до 3000 итераций. После этого возникают тупики и зависания программы. Пожалуйста, не стесняйтесь бросать любые предложения. Ниже мой псевдокод .. дайте мне знать, если у вас есть какие-либо вопросы.
N - количество узлов обработки:
do{
if(numberIterations>1) -- Receive Data
{
getdata:
MPI_Iprobe()
while(flagprobe !=0)
{
If(TAG=StausUpdate)
Update status of processor;
If(TAG=Data)
Process Data;
MPI_Iprobe()
}
}
if( numberIterations< MaxIterations ) -- Send Data
{
for(i=0;i<N;i++)
MPI_Bsend_init(request[i])
for(i=0;i<N;i++)
MPI_Start(request[i])
numberIterations++;
}
if(numberIterations == MaxIterations) -- Update Processor Status
{
for(i=0;i<N;i++)
MPI_Isend(request1[i]) -- with TAG = StatusUpdate
goto getdata;
set endloopflag = 1
}
if(numberIterations == MaxIterations && endloopflag ==1) --Final Check
{
for(i=0;i<N;i++)
MPI_Test(request1[i],flagtest);
if(!flagtest)
goto getdata;
}
} while(numberIterations < MaxIterations);
for(i=0;i<N;i++) --Free request
{
MPI_Request_free(&request[i]);
}
--- Обновлен псевдокод согласно Mark
#include <string.h>
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <iostream>
#include <fstream>
#include "mpi.h"
#define N 9 //# of nodes
#define M 10 //samples number
#define n 2 //demension of weight vector
#define TAU 0.15
#define DISTANCE 0.1 //measuremeant for two nodes
#define A 0.2 //learning rate
#define ITERATION_STEPS 1000 // Program goes for ITERATION_STEPS - 1
#define SAMPLE_STEP 1 //Number of current iteration
#define BT1 0.17
#define BT2 0.02
#define A0 0.9 //initial learning rate
#define AC 0.05 //middle learning rate
#define AF 0.001 //final learning rate
#define TC 4500 //first period of iteration
#define TF 5000 //second period of iteration
#define BUFSIZE 400000
using namespace std;
void printtime(double comm_time,double update_time,string filename,int rank);
int checkack(int ack[],int status[]);
int checkstatus(int status[],int procid);
int noof_activeproc(int status[],int myrank);
void printresult(double w[][n],string filename,int rank);
void plot(double w[][n], char* fileName);
void update(double w[][n], double x[], int t,int rank, int g[][9]);
double norm(double a[], double b[]);
double p(double sample[], double w[], int t);
void OneToTwo (int index, int *row, int *col);
int g(int b, int j);
int main(int argc, char *argv[])
{
int rank,size;;
MPI_Init(&argc,&argv );
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Status statusprobe,status,status1[N],status2[N];
MPI_Request request[N],request1[N],request2[N],request3[N]; //N request for N process per iteration
double buf[BUFSIZE]; // buffer for the outgoing message;
int procstatus[N],ack[N]; //store the process status and ack
double temp[n];
double tempsend[n];
ifstream in1, in2, in3;
ofstream out,outtime;
int i, j,k,z,req = 0,req1;
int checklocation=0; //bookmark for MPI_Test
int checklocation1=0; //bookmark for MPI_Test
int numberIterations;
double samples[M][n]; //for all samples
double w[N][n]; //for all node weight
double x[n]; //one sample
int g[9][9];
int count=n;
int flagprobe=0;
int flagtest=1;
int flagrecv=0;
int datareadflag; // flag that checks wheter the data is read or not
double dataincount;
int checktestflag;
int requestfreeflag=0;
int flag=0; //test flag
int flagtest1=0; // check for the request to update the processor status
int endloopflag=0;
int *bptr, bl;
double start_time,end_time,tupdate_start,tupdate_end,t_temp1,t_temp2;
double comm_time;
double update_time=0;
for(i=0;i<N;i++)
{
procstatus[i]=1; // all the processor are on
ack[i]=0;
}
// read sample data
in1.open("samples.dat");
if(!in1)
{
cout<<"100:File openning error. \n";
exit(100);
}
in2.open("initialMap.dat");
if(!in2)
{
cout<<"200:File openning error. \n";
exit(200);
}
in3.open("gij.dat");
if(!in3)
{
cout<<"200:File openning error. \n";
exit(200);
}
for(i=0; i<M; i++)
for(j=0; j<n; j++){
in1>>samples[i][j];
//cout<<samples[i][j]<<"="<<i<<","<<j<<" ";
}
//read initial weights
for(i=0; i<N; i++)
for(j=0; j<n; j++) {
in2>>w[i][j];
//cout<<w[i][j]<<"="<<i<<","<<j<<" ";
}
//read Gij
for(i=0; i<9; i++)
for(j=0; j<9; j++) {
in3>>g[i][j];
//cout<<w[i][j]<<"="<<i<<","<<j<<" ";
}
//Print W to file
out.open("w.dot");
out<<"graph G {"<<endl;
out<<"size=\"10,10\";"<<endl;
out<<"ratio=expand;"<<endl;
out<<"node [shape=circle];"<<endl;
//out<<"node [shape=point];"<<endl;
for(i=0; i<9; i++) {
for(j=0; j<n; j++) {
if(j == 0) out<<i+1<<"[pos = \"";
out<<w[i][j];
if(j == 0) out<<",";
if(j == 1) out<<"!\"]"<<endl;
}
}
for(i=0; i<9; i++)
for(j=0; j<i+1; j++) {
if(g[i][j] == 1 && i != j) out<<i+1<<" -- "<<j+1<<";"<<endl;
}
out<<"}"<<endl;
MPI_Barrier(MPI_COMM_WORLD);
MPI_Buffer_attach( buf, BUFSIZE );
k = 0;
numberIterations = 1;
dataincount=N; //for the first time , all process or has N data in from file.
datareadflag=1;
checktestflag=1;
int tagno=1;
int prevtag; // start_time=MPI_Wtime();
time_t start,start1,end1,end;
time(&start);
do{
if(numberIterations%SAMPLE_STEP==0)
{
t_temp1=MPI_Wtime();
if(k>=M) k=0;
for(j=0; j<n; j++) {
x[j]=samples[k][j];
}
k++;
t_temp2=MPI_Wtime();
}
if(numberIterations>1)
{ getdata:
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flagprobe, &statusprobe); //tag = numberIterations,
while(flagprobe != 0)
{
if(statusprobe.MPI_TAG==0) // tag=0 means status update of the processor
{
int rtemp[1];
MPI_Recv(rtemp,1,MPI_INT,statusprobe.MPI_SOURCE,0, MPI_COMM_WORLD, &status );
procstatus[status.MPI_SOURCE]=rtemp[0];
}
else
{
datareadflag=1;
dataincount++;
MPI_Recv(temp,count,MPI_DOUBLE,statusprobe.MPI_SOURCE,statusprobe.MPI_TAG, MPI_COMM_WORLD, &status );
for(j=0;j<n;j++) w[status.MPI_SOURCE][j]=temp[j];
}
MPI_Iprobe(MPI_ANY_SOURCE,MPI_ANY_TAG, MPI_COMM_WORLD,&flagprobe, &statusprobe);
} //end while
} //end if (no of iteration >1)
if( numberIterations< ITERATION_STEPS ) // do not send on last iterations.
{
tupdate_start=MPI_Wtime();
update(w,x,k,rank,g);
tupdate_end=MPI_Wtime();
update_time=update_time+tupdate_end-tupdate_start;
if(req==0) {
for(i=0;i<N;i++)
{ int c=0;
if((i!=rank)&&(checkstatus(procstatus,i)==1)) // send if only the process is active
{
MPI_Bsend_init(w[rank], count, MPI_DOUBLE, i ,tagno, MPI_COMM_WORLD,&request[i]);
MPI_Bsend_init(&c,1, MPI_INT,i,0, MPI_COMM_WORLD,&request1[i]);
}
} //end for
req=1;
}
for(i=0;i<N;i++)
{
if((i!=rank)&&(checkstatus(procstatus,i)==1)) // send if only the process is active
{
MPI_Start(&request[i]);
//actual message send.
}
}
tagno++;
requestfreeflag==1;
checktestflag=0;
dataincount=0;
checklocation=0;
numberIterations++;
datareadflag=0;
cout<<numberIterations<<"-th iterations for . "<<rank<<endl;
} //end if( numberIterations< ITERATION_STEPS )
/* Before exiting notify all the active process */
if((numberIterations == ITERATION_STEPS) && (endloopflag==0)) //endloop flag prevent sending twice
{ // status value (initially all 1);
req1=0;
for(i=0;i<N;i++)
{
if((i!=rank)&&(checkstatus(procstatus,i)==1)) // check if only the process is active
{
MPI_Start(&request1[i]);
}
}
endloopflag=1;
goto getdata;
} //end if
if(numberIterations == ITERATION_STEPS && endloopflag==1)
{
for(i=1;i<N;i++)
{
if((i!=rank)&&(checkstatus(procstatus,i)==1)) // check if only the process is active
{
MPI_Test(&request[i], &flagtest, &status);
MPI_Test(&request1[i], &flagtest1, &status);
if(!flagtest || !flagtest1)
{
checklocation1=i; //for next check continue from i;
cout<<"getdata called by" <<rank<<endl;
goto getdata;
}
} //end if
}//end for
} //end if
} while(numberIterations < ITERATION_STEPS);
for(i=0;i<N;i++)
{
if(i!=rank && request[i]!=MPI_REQUEST_NULL)
{
MPI_Request_free(&request[i]);
MPI_Request_free(&request1[i]);
}
}
if(numberIterations == ITERATION_STEPS)
{
char pno[2];
sprintf(pno,"%d",rank);
string filename;
filename=filename+pno;
filename=filename+".dot";
char *file=strdup(filename.c_str());
ofstream out;
out.open(file);
//plot(w, "final_map_25.dat",rank);
out<<"graph G {"<<endl;
out<<"size=\"10,10\";"<<endl;
out<<"ratio=expand;"<<endl;
out<<"node [shape=point];"<<endl;
//out<<"node [shape=point];"<<endl;
for(i=0; i<9; i++) {
for(j=0; j<n; j++) {
if(j == 0) out<<i+1<<"[pos = \"";
out<<w[i][j];
if(j == 0) out<<",";
if(j == 1) out<<"!\"]"<<endl;
}
}
for(i=0; i<9; i++)
for(j=0; j<i+1; j++) {
if(g[i][j] == 1 && i != j) out<<i+1<<" -- "<<j+1<<";"<<endl;
}
out<<"}"<<endl;
}
MPI_Buffer_detach( &bptr, &bl );
MPI_Finalize();
return 0;
} // End main Program