Как и в предыдущем вопросе, я работаю над программой для запуска из кластера Raspberry Pi 4. Моя текущая проблема заключается в том, что когда я вызываю функцию four_thread, независимо от того, что я получаю, вызывается поврежденный вызов верхнего размера mallo c (). Я чувствую себя несколько слепым, поскольку это, вероятно, невероятно очевидная проблема, но я не могу ее найти. К счастью, мне кажется, что при вызове функции все идет не так, как надо, так как "Made it to four_thread" не печатается. У кого-нибудь есть идеи? Также извините, я, вероятно, опубликую лишний код, чтобы было ясно, что другие разделы не являются проблемой.
#include <stdlib.h>
#include <curl/curl.h>
#include <string.h>
#include <pthread.h>
#include <mpi.h>
#define NUMC 4
struct BaseStats
{
char * ticker;
int errors;
float currentPrice;
float fiftyDayAverage;
float twoHundredDayAverage;
float fiftyTwoWeekHigh;
unsigned long long marketCap;
unsigned long long floatShares;
float shortPercentOfFloat;
float trailingPE;
unsigned long long sharesOutstanding;
float revenuePerShare;
float profitMargins;
float currentRatio;
float quickRatio;
float returnOnEquity;
float returnOnAssets;
float debtToEquity;
float priceToSales;
float priceToBook;
unsigned long long averageVolume;
unsigned long long volume;
int rank;
};
void *set_Stats(void *z);
struct Tickers {
char ** x;
int length;
};
struct Tickers getTickers() //returns a list of nasdaq stocks
{
CURL *curl = curl_easy_init();
char line[300];
struct Tickers tickers;
int length = 0;
char url[60] = "ftp://ftp.nasdaqtrader.com/symboldirectory/nasdaqtraded.txt";
if(curl)
{
FILE *temp = tmpfile();
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void*)temp);
curl_easy_perform(curl);
if(temp)
{
rewind(temp);
char c = fgetc(temp);
while (fgets(line,sizeof(line),temp))
{
length++;
}
rewind(temp);
length = length - 2;
tickers.length = length;
tickers.x = malloc(length * 7);
fgets(line, sizeof(line), temp);
fgets(line, sizeof(line), temp);
for(int count = 0; count < length; count++)
{
char *string = line + 2;
int tickLen = 0 ;// strstr(string, "|") - string;
if(strstr(string, "|") - string;
{
tickLen = strstr(string, "\") - string;
tickers.x[count] = malloc(tickLen);
strncpy(tickers.x[count], string, tickLen);
tickers.x[count][tickLen] = '\0';
}
fgets(line, sizeof(line), temp);
}
fclose(temp);
}
}
curl_easy_cleanup(curl);
return tickers;
}
char * getStr (char ticker[])
{
//uses curl to get a char * of a stock website
}
void four_thread(struct BaseStats results[], char **keys, int len) //input the storage array, the tickers,
{
printf("Made it to four_thread"); //and the length of the two arrays.
void *ret;
int keyCounter = 0;
int coreCounter = 0;
int ierr = 0;
printf("Starting Thread Initialization");
pthread_t thread[NUMC];
printf("Ending Thread Initialization");
int iterations = len / NUMC;
int remainder = len % NUMC;
while(iterations > 0)
{
for(coreCounter = 0; coreCounter < NUMC; coreCounter++)
{
printf("CREATING THREAD");
pthread_create(&thread[coreCounter], NULL, set_Stats,&keys[keyCounter + coreCounter]);
}
for(coreCounter = 0; coreCounter < NUMC; coreCounter++)
{
pthread_join(thread[coreCounter], &ret);
results[keyCounter + coreCounter] = *((struct BaseStats *)ret);
free(ret);
}
keyCounter+= NUMC;
iterations--;
}
for(coreCounter = 0; coreCounter < remainder; coreCounter++)
pthread_create(&thread[coreCounter], NULL, set_Stats,&keys[keyCounter + coreCounter]);
for(coreCounter = 0; coreCounter < remainder; coreCounter++)
{
pthread_join(thread[coreCounter], &ret);
results[keyCounter + coreCounter] = *((struct BaseStats *)ret);
free(ret);
}
}
void *set_Stats(void *arg) //input the ticker
{
int c;
struct BaseStats *z = malloc(sizeof(struct BaseStats));
strcpy(z->ticker,(char *)arg);
char *str = getStr(z->ticker);
// basically it uses str to initialize z's values.
set_Rank(z);
pthread_exit(z);
}
char ** allocate_DD(int rows, int cols)
{
char *data = (char *)malloc(rows*cols);
char **array= (char **)malloc(rows*sizeof(char*));
for(int i = 0; i < rows; i++)
array[i] = &(data[cols*i]);
return array;
}
int main() //Designed for one master, three slaves
{
int my_id;
char **myTicks;
int myLen;
MPI_Init(NULL,NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &my_id);
MPI_Status status;
MPI_Datatype types[23] = {MPI_CHAR,MPI_INT,MPI_FLOAT,MPI_FLOAT,MPI_FLOAT,MPI_FLOAT,
MPI_UNSIGNED_LONG_LONG,MPI_UNSIGNED_LONG_LONG,MPI_FLOAT,MPI_FLOAT, MPI_UNSIGNED_LONG_LONG,
MPI_FLOAT,MPI_FLOAT,MPI_FLOAT,MPI_FLOAT,MPI_FLOAT,MPI_FLOAT,MPI_FLOAT,MPI_FLOAT,MPI_FLOAT,
MPI_UNSIGNED_LONG_LONG, MPI_UNSIGNED_LONG_LONG, MPI_INT};
MPI_Datatype MPI_Base_Stats;
int blocklengths[23] ={7,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1};
MPI_Aint offsets[23];
offsets[0] = offsetof(struct BaseStats,ticker); offsets[1] = offsetof(struct BaseStats,errors);
offsets[2] = offsetof(struct BaseStats,currentPrice); offsets[3] = offsetof(struct BaseStats,fiftyDayAverage);
offsets[4] = offsetof(struct BaseStats,twoHundredDayAverage); offsets[5] = offsetof(struct BaseStats,fiftyTwoWeekHigh);
offsets[6] = offsetof(struct BaseStats,marketCap); offsets[7] = offsetof(struct BaseStats,floatShares);
offsets[8] = offsetof(struct BaseStats,shortPercentOfFloat); offsets[9] = offsetof(struct BaseStats,trailingPE);
offsets[10]= offsetof(struct BaseStats,sharesOutstanding); offsets[11] = offsetof(struct BaseStats,revenuePerShare);
offsets[12]= offsetof(struct BaseStats,profitMargins); offsets[13] = offsetof(struct BaseStats,currentRatio);
offsets[14]= offsetof(struct BaseStats,quickRatio); offsets[15]= offsetof(struct BaseStats,returnOnEquity);
offsets[16]= offsetof(struct BaseStats,returnOnAssets); offsets[17]= offsetof(struct BaseStats,debtToEquity);
offsets[18]= offsetof(struct BaseStats,priceToSales); offsets[19]= offsetof(struct BaseStats,priceToBook);
offsets[20]= offsetof(struct BaseStats, averageVolume); offsets[21]= offsetof(struct BaseStats,volume);
offsets[22]= offsetof(struct BaseStats,rank);
MPI_Type_create_struct(23,blocklengths,offsets,types, &MPI_Base_Stats);
MPI_Type_commit(&MPI_Base_Stats);
if(my_id == 0) // meaning this process is a host job
{
struct Tickers tickers = getTickers();
int length = tickers.length / 4;
int remainder = tickers.length % 4;
myLen = length+remainder;
myTicks = allocate_DD(length + remainder, 7);
char ** NodeOneTicks = allocate_DD(length, 7);
char ** NodeTwoTicks = allocate_DD(length, 7);
char ** NodeThrTicks = allocate_DD(length, 7);
int count = 0;
int x = 0;
while(x < myLen)
{
strcpy(myTicks[x], tickers.x[count]);
count++;
x++;
}
x = 0;
while(x < length)
{
strcpy(NodeOneTicks[x], tickers.x[count]);
count++;
x++;
}
x = 0;
while(x < length)
{
strcpy(NodeTwoTicks[x], tickers.x[count]);
count++;
x++;
}
x = 0;
while(x < length)
{
strcpy(NodeThrTicks[x], tickers.x[count]);
count++;
x++;
}
/* MPI_Send(&length,1, MPI_INT,1,0,MPI_COMM_WORLD);
MPI_Send(NodeOneTicks[0],length * 7,MPI_CHAR,1,0,MPI_COMM_WORLD);
free(NodeOneTicks[0]);
free(NodeOneTicks);
MPI_Send(&length,1, MPI_INT,2,0,MPI_COMM_WORLD);
MPI_Send(NodeTwoTicks[0],length * 7,MPI_CHAR,2,0,MPI_COMM_WORLD);
free(NodeTwoTicks[0]);
free(NodeTwoTicks);
MPI_Send(&length,1, MPI_INT,3,0,MPI_COMM_WORLD);
MPI_Send(NodeThrTicks[0],length * 7,MPI_CHAR,3,0,MPI_COMM_WORLD);
free(NodeThrTicks[0]);
free(NodeThrTicks);
*/
struct BaseStats *myStats = calloc(myLen, sizeof(struct BaseStats));
printf("Ticker 31 = %s\n", myTicks[31]);
four_thread(myStats, myTicks, myLen);
printf("%s at a rank of %d\n", myStats[1].ticker, myStats[1].rank);
else // must be slave process
{
myLen = 4;
// MPI_Recv(&myLen,1,MPI_INT,0,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
myTicks = allocate_DD(myLen, 7);
// MPI_Recv(myTicks[0], myLen * 7,MPI_CHAR,0,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
myTicks[0] = "TSLA";
myTicks[1] = "AAPL";
myTicks[2] = "MSFT";
myTicks[3] = "SPY";
// printf("%s\n", myTicks[0]);
}
MPI_Finalize();
}