У меня проблема с обнаружением утечки памяти в моей программе.
top сообщает об увеличении использования памяти при запуске программы. При профилировании моей программы с помощью valgrind об утечках памяти не сообщается.
Программа состоит из потока «считывателя» и нескольких потоков «потребителя».
Поток «считывателя» загружает данные в один из нескольких указатели char **, по одному на каждый «потребительский» поток.
«потребительский» поток работает с данными своего соответствующего указателя char * и освобождает память.
Я включил некоторый псевдокод, описывающий что делает моя программа Я знаю, что предоставленного кода может быть недостаточно для описания проблемы. Я рад включить весь проект кода, если это поможет.
поток "считывателя", сжатый для краткости
//'nconsumers': number of consumer threads
char ***queue = malloc(nconsumers*sizeof(char **));
for (int i = 0; i < nconsumers; i++) {
//'length' number of datapoints a 'consumer' works on at a time
queue[i] = malloc(length*sizeof(char *));
}
char *data = NULL;
int qtracker = 0; //tracks to which 'consumer' data should be assgned
int ltracker = 0; //tracks how many datapoints have been added to each 'consumer'
//loaddata reads data and stores it in 'data' struct
while (loaddata(data) >= 0) {
char *datapoint = malloc(data->legth);
memcpy(datapoint, data->content, data->length);
queue[qtracker][ltracker] = datapoint;
qtracker++;
if (nconsumers == qtracker) {
qtracker = 0;
ltracker++;
if (length == ltracker) ltracker = 0;
}
}
//NULL pointers are added to the end of each 'consumer' queues to indicate all data has been read
поток "потребителя"
//Consumers are initialized and a queue is assigned to them
int qnum = "some number between 0 and nconsumers";
int datatracker = 0;
char **dataqueue = queue[qnum];
datapoint = dataqueue[datatracker]
datatracker++;
while (datapoint != NULL) {
//Do work on data
free(datapoint);
datapoint = dataqueue[datatracker];
datatracker++;
//More synchronization code
}
поток "потребителя" правильно считывает данные и обрабатывает их так, как должен , Опять же, valgrind сообщает об отсутствии утечек памяти. При мониторинге моего процесса с помощью top или htop использование памяти этой программой увеличивается до уровня, когда моя машина начинает переставлять.
EDIT
Я добавил полную программу это воспроизводит ошибку. Это не совсем та программа, в которой я столкнулся с проблемой, поскольку она содержит дополнительные зависимости. Опять же, эта программа порождает 1 «читательский» поток и N потребительских потоков. При работе с большим текстовым файлом с сотнями миллионов строк (например, файлами секвенирования ДНК) htop постоянно демонстрирует растущее использование памяти с использованием valgrind, показывающего, что утечки памяти не исключаются, за исключением pthreads, определяемых c one.
Спасибо снова за помощь !!
Скомпилируйте и запустите в любой современной linux коробке
gcc -Wall -o <name> <program.c> -lm -lpthread
./name large_text_file.txt <num_threads> <>
Только это предупреждение должно отображаться, поскольку я использовал извлеченный указатель в этом примере:
<program>.c: In function ‘consumer’:
<program>.c:244:11: warning: variable ‘line’ set but not used [-Wunused-but-set-variable]
char *line = NULL;
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <math.h>
#include <unistd.h>
// Data passed to threads
typedef struct {
//Input file
FILE *fp;
//Number of threads
int numt;
//Syncronization data
pthread_mutex_t mtx;
pthread_cond_t workcond;
pthread_cond_t readcond;
int gowork;
int goread;
//Tracks how many threads are done analyzing data
int doneq;
/*
Stores "data queues" (1 queue per thread)
queue -> [ [ char** [ char** [ char** [ char** [ char**
len(queue)=numt [char*] [char*] [char*] [char*] [char*]
len(queue[n])=maxqueue [char*] [char*] [char*] [char*] [char*]
len(queue[n][m])=data ... ... ... ... ...
[char*] [char*] [char*] [char*] [char*]
] ] ] ] ]
]
*/
char ***queue;
//Internal thread ID
int *threadidx;
//Maximum number of lines to read
int maxseqs;
//Maximum number of lines per thread == maxseqs/numthreads
int maxqueue;
} thread_t;
/*
Extracts char * pointers from one of the "data queues". Does work with
the data and frees when done.
*/
void *reader(void *threaddata);
/*
Reads lines from text file, copies line content and length into a char * pointer
and adds it to an "analysis queue" to be processed by one of the "consumers"
*/
void *consumer(void *threaddata);
/*
Initializes thread data
*/
int threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs);
/*
Cleans thread data before exit
*/
void threadtkill(thread_t *threaddata);
int main(int argc, char *argv[])
{
if (argc < 4) {
fprintf(stderr, "ERROR: Not enough arguments.\n");
exit(-1);
}
FILE *fp = fopen(argv[1], "r");
if (!fp) {
fprintf(stderr, "ERROR: Failed to open input file.\n");
exit(-1);
}
int numt = atoi(argv[2]);
if (!numt) {
fprintf(stderr, "ERROR: Please specify number of threads.\n");
fclose(fp);
exit(-1);
}
int maxseqs = atoi(argv[3]);
if (!maxseqs) {
fprintf(stderr, "ERROR: Please specify max number of lines.\n");
fclose(fp);
exit(-1);
}
//Start data struct for threads
thread_t threaddata;
if (!threadtinit(fp, numt, &threaddata, maxseqs)) {
fprintf(stderr, "ERROR: Could not initialize thread data.\n");
fclose(fp);
exit(-1);
}
fprintf(stderr, "Thread data initialized.\n");
//return code
int ret;
//pthread creation
pthread_t readerthread;
pthread_t *consumerpool = NULL;
consumerpool = malloc((numt)*sizeof(pthread_t));
if (!consumerpool) {
fprintf(stderr, "Failed to allocate threads.\n");
ret = -1;
goto exit;
}
// Initialize and set thread detached attribute
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
//Consumer threads
int thrc;
for (int i = 0; i < numt; i++) {
thrc = pthread_create(consumerpool + i,
&attr,
consumer,
(void *)&threaddata);
if (thrc) {
fprintf(stderr, "ERROR: Thread creation.\n");
ret = -1;
goto exit;
}
}
//Couple of sleeps to keep track of stuff while running
sleep(1);
//Reader thread
thrc = pthread_create(&readerthread,
&attr,
reader,
(void *)&threaddata);
if (thrc) {
fprintf(stderr, "ERROR: Thread creation.\n");
ret = -1;
goto exit;
}
// Free attribute and wait for the other threads
pthread_attr_destroy(&attr);
int jrc;
jrc = pthread_join(readerthread, NULL);
if (jrc) {
fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
}
for (int i = 0; i < numt; i++) {
jrc = pthread_join(*(consumerpool + i), NULL);
if (jrc) {
fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
ret = -1;
goto exit;
}
}
ret = 0;
exit:
threadtkill(&threaddata);
free(consumerpool);
fprintf(stderr, "Finished.\n");
return(ret);
}
void *reader(void *readt)
{
fprintf(stderr, "Reader thread started.\n");
thread_t *threaddata = readt;
int numt = threaddata->numt;
int maxqueue = threaddata->maxqueue;
int maxseqs = threaddata->maxseqs;
FILE *fp = threaddata->fp;
// Array of queues, one per consumer thread
char ***queue = threaddata->queue;
// Number of bytes used to store length of line
size_t bytes = sizeof(ssize_t);
// Tracks number of lines loaded so far
size_t nlines = 0;
// Tracks to which queue data should be added to
int qtracker = 0;
// Tracks to which position in any particular queue, data should be added
int ltracker = 0;
// Holds read line
char *line = NULL;
ssize_t linelength = 0;
size_t n;
// Tracks how much data will be read
size_t totallength = 0;
size_t totallines = 0;
while ( (linelength = getline(&line, &n, fp)) != -1 ) {
// enough data is used to hold line contents + line length
char *data = malloc(bytes + linelength + 1);
if (!data) {
fprintf(stderr, "memerr\n");
continue;
}
// move line lenght bytes to data
memcpy(data, &linelength, bytes);
//move line bytes to data
memcpy(data + bytes, line, linelength + 1);
totallength += linelength;
// Add newly allocated data to one of numt queues
queue[qtracker][ltracker] = data;
qtracker++;
if (numt == qtracker) {
// Loop around queue
qtracker = 0;
ltracker++;
// Loop around positions in queue
if (maxqueue == ltracker) ltracker = 0;
}
nlines++;
// Stop reading thread and start consumer threads
if (nlines == maxseqs) {
fprintf(stderr, "%lu lines loaded\n", nlines);
sleep(3);
totallines += nlines;
nlines = 0;
fprintf(stderr, "Waking up consumers\n");
pthread_mutex_lock(&(threaddata->mtx));
//Wake consumer threads
threaddata->gowork = 1;
pthread_cond_broadcast(&(threaddata->workcond));
//Wait for consumer threads to finish
while ( !threaddata->goread ) {
pthread_cond_wait(&(threaddata->readcond),
&(threaddata->mtx));
}
fprintf(stderr, "Reader has awoken!!!!\n\n");
sleep(3);
threaddata->goread = 0;
pthread_mutex_unlock(&(threaddata->mtx));
}
}
//Add NULL pointers to the end of each queue to indicate reading is done
pthread_mutex_lock(&(threaddata->mtx));
for (int i = 0; i < numt; i++) {
queue[i][ltracker] = NULL;
}
// Wake consumers for the last time
threaddata->gowork = 1;
pthread_cond_broadcast(&(threaddata->workcond));
pthread_mutex_unlock(&(threaddata->mtx));
// Log info
fprintf(stderr, "%lu characters read.\n", totallength);
if (line) free(line);
pthread_exit(NULL);
}
void *consumer(void *consumert)
{
thread_t *threaddata = consumert;
// Number of consumer threads
int numt = threaddata->numt;
// Max length of queue to extract data from
int maxqueue = threaddata->maxqueue;
// Holds data sent by reader thread
char *data = NULL;
// Holds the actual line read
char *line = NULL;
size_t linelength;
size_t bytes = sizeof(ssize_t);
// get queue number for corresponding thread
int qnum = -1;
pthread_mutex_lock(&(threaddata->mtx));
int *tlist = threaddata->threadidx;
while (qnum == -1) {
qnum = *tlist;
*tlist = -1;
tlist++;
}
fprintf(stderr, "Thread got queueID: %d.\n", qnum);
pthread_mutex_unlock(&(threaddata->mtx));
// Any thread works on only one and one queue only
char **queue = threaddata->queue[qnum];
//After initializing, wait for reader to start working
pthread_mutex_lock(&(threaddata->mtx));
while ( !threaddata->gowork) {
pthread_cond_wait(&(threaddata->workcond), &(threaddata->mtx));
}
fprintf(stderr, "Consumer thread started queueID %d.\n", qnum);
pthread_mutex_unlock(&(threaddata->mtx));
// Tracks number of characters this thread consumes
size_t totallength = 0;
// Tracks from which position in queue data should be taken from
size_t queuecounter = 1;
// Get first data point
data = queue[0];
while (data != NULL) {
//get line length
memcpy(&linelength, data, bytes);
//get line
line = data + bytes;
//Do work
totallength += linelength;
free(data);
//Check for number of sequences analyzed
if (queuecounter == maxqueue) {
// Wait for other threads to catchup
sleep(1);
queuecounter = 0;
pthread_mutex_lock(&(threaddata->mtx));
threaddata->doneq++;
threaddata->gowork = 0;
// If this thread is the last one to be done with its queue, wake
// reader
if (threaddata->doneq == numt) {
threaddata->goread = 1;
pthread_cond_signal(&(threaddata->readcond));
threaddata->doneq = 0;
}
// When done consuming data, wait for reader to load more
while (!threaddata->gowork) {
pthread_cond_wait(&(threaddata->workcond),
&(threaddata->mtx));
}
pthread_mutex_unlock(&(threaddata->mtx));
}
//Get next line
data = queue[queuecounter];
queuecounter++;
}
// Log and exit
fprintf(stderr, "\tThread %d analyzed %lu characters.\n", qnum, totallength);
pthread_exit(NULL);
}
int threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs)
{
threaddata->fp = fp;
//Determine maximum thread queue length
threaddata->maxqueue = ceil((float)maxseqs/numt);
threaddata->maxseqs = threaddata->maxqueue*numt;
fprintf(stderr, "max lines to load: %d\n", threaddata->maxseqs);
fprintf(stderr, "max lines per thread: %d\n", threaddata->maxqueue);
threaddata->numt = numt;
//Allocate data for queues and initilize them
threaddata->queue = malloc(numt*sizeof(char *));
threaddata->threadidx = malloc(numt*sizeof(int));
for (int i = 0; i < numt; i++) {
threaddata->queue[i] = malloc(threaddata->maxqueue*sizeof(char *));
threaddata->threadidx[i] = i;
}
//Initialize syncronization data
pthread_mutex_init(&(threaddata->mtx), NULL);
pthread_cond_init(&(threaddata->workcond), NULL);
pthread_cond_init(&(threaddata->readcond), NULL);
threaddata->gowork = 0;
threaddata->goread = 0;
threaddata->doneq = 0;
return 1;
}
void threadtkill(thread_t *threaddata)
{
fclose(threaddata->fp);
for (int i = 0; i < threaddata->numt; i++) {
free(threaddata->queue[i]);
}
free(threaddata->queue);
free(threaddata->threadidx);
pthread_mutex_destroy(&(threaddata->mtx));
}