Огромное количество использования памяти, утечки памяти не обнаружено - PullRequest
0 голосов
/ 22 апреля 2020

У меня проблема с обнаружением утечки памяти в моей программе.

top сообщает об увеличении использования памяти при запуске программы. При профилировании моей программы с помощью valgrind об утечках памяти не сообщается.

Программа состоит из потока «считывателя» и нескольких потоков «потребителя».

Поток «считывателя» загружает данные в один из нескольких указатели char **, по одному на каждый «потребительский» поток.

«потребительский» поток работает с данными своего соответствующего указателя char * и освобождает память.

Я включил некоторый псевдокод, описывающий что делает моя программа Я знаю, что предоставленного кода может быть недостаточно для описания проблемы. Я рад включить весь проект кода, если это поможет.

поток "считывателя", сжатый для краткости

//'nconsumers': number of consumer threads
char ***queue = malloc(nconsumers*sizeof(char **));
for (int i = 0; i < nconsumers; i++) {
    //'length' number of datapoints a 'consumer' works on at a time
    queue[i] = malloc(length*sizeof(char *));
}

char *data = NULL;
int qtracker = 0; //tracks to which 'consumer' data should be assgned
int ltracker = 0; //tracks how many datapoints have been added to each 'consumer'
//loaddata reads data and stores it in 'data' struct
while (loaddata(data) >= 0) {
    char *datapoint = malloc(data->legth); 
    memcpy(datapoint, data->content, data->length);
    queue[qtracker][ltracker] = datapoint;
    qtracker++;
    if (nconsumers == qtracker) { 
        qtracker = 0;
        ltracker++;
        if (length == ltracker) ltracker = 0;
    }
}
//NULL pointers are added to the end of each 'consumer' queues to indicate all data has been read

поток "потребителя"

//Consumers are initialized and a queue is assigned to them
int qnum = "some number between 0 and nconsumers";
int datatracker = 0;
char **dataqueue = queue[qnum];

datapoint = dataqueue[datatracker]
datatracker++;
while (datapoint != NULL) {
    //Do work on data
    free(datapoint);
    datapoint = dataqueue[datatracker];
    datatracker++;

    //More synchronization code
}

поток "потребителя" правильно считывает данные и обрабатывает их так, как должен , Опять же, valgrind сообщает об отсутствии утечек памяти. При мониторинге моего процесса с помощью top или htop использование памяти этой программой увеличивается до уровня, когда моя машина начинает переставлять.

EDIT

Я добавил полную программу это воспроизводит ошибку. Это не совсем та программа, в которой я столкнулся с проблемой, поскольку она содержит дополнительные зависимости. Опять же, эта программа порождает 1 «читательский» поток и N потребительских потоков. При работе с большим текстовым файлом с сотнями миллионов строк (например, файлами секвенирования ДНК) htop постоянно демонстрирует растущее использование памяти с использованием valgrind, показывающего, что утечки памяти не исключаются, за исключением pthreads, определяемых c one.

Спасибо снова за помощь !!

Скомпилируйте и запустите в любой современной linux коробке

gcc -Wall -o <name> <program.c> -lm -lpthread
./name large_text_file.txt <num_threads> <>

Только это предупреждение должно отображаться, поскольку я использовал извлеченный указатель в этом примере:

<program>.c: In function ‘consumer’:
<program>.c:244:11: warning: variable ‘line’ set but not used [-Wunused-but-set-variable]
     char *line = NULL;
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <math.h>
#include <unistd.h>

// Data passed to threads
typedef struct {
    //Input file
    FILE *fp;
    //Number of threads
    int numt;
    //Syncronization data
    pthread_mutex_t mtx;
    pthread_cond_t workcond;
    pthread_cond_t readcond;
    int gowork;
    int goread;
    //Tracks how many threads are done analyzing data
    int doneq;
    /*
      Stores "data queues" (1 queue per thread)
      queue ->       [  [ char**    [ char**    [ char**    [ char**    [ char**
len(queue)=numt          [char*]     [char*]     [char*]     [char*]     [char*]
len(queue[n])=maxqueue   [char*]     [char*]     [char*]     [char*]     [char*]
len(queue[n][m])=data      ...         ...         ...         ...         ...
                         [char*]     [char*]     [char*]     [char*]     [char*]
                                 ]           ]           ]           ]          ]
                                                                                ]
    */
    char ***queue;
    //Internal thread ID
    int *threadidx;
    //Maximum number of lines to read
    int maxseqs;
    //Maximum number of lines per thread == maxseqs/numthreads
    int maxqueue;
} thread_t;

/*
Extracts char * pointers from one of the "data queues". Does work with
the data and frees when done.
*/
void *reader(void *threaddata);

/*
Reads lines from text file, copies line content and length into a char * pointer
and adds it to an "analysis queue" to be processed by one of the "consumers"
*/
void *consumer(void *threaddata);

/*
Initializes thread data
*/
int  threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs);

/*
Cleans thread data before exit
*/
void threadtkill(thread_t *threaddata);


int main(int argc, char *argv[])
{
    if (argc < 4) {
        fprintf(stderr, "ERROR: Not enough arguments.\n");
        exit(-1);
    }

    FILE *fp = fopen(argv[1], "r");
    if (!fp) {
        fprintf(stderr, "ERROR: Failed to open input file.\n");
        exit(-1);
    }

    int numt = atoi(argv[2]);
    if (!numt) {
        fprintf(stderr, "ERROR: Please specify number of threads.\n");
        fclose(fp);
        exit(-1);
    }

    int maxseqs = atoi(argv[3]);
    if (!maxseqs) {
        fprintf(stderr, "ERROR: Please specify max number of lines.\n");
        fclose(fp);
        exit(-1);
    }

    //Start data struct for threads
    thread_t threaddata;
    if (!threadtinit(fp, numt, &threaddata, maxseqs)) {
        fprintf(stderr, "ERROR: Could not initialize thread data.\n");
        fclose(fp);
        exit(-1);
    }
    fprintf(stderr, "Thread data initialized.\n");


    //return code
    int ret;

    //pthread creation
    pthread_t readerthread;
    pthread_t *consumerpool = NULL;
    consumerpool = malloc((numt)*sizeof(pthread_t));
    if (!consumerpool) {
        fprintf(stderr, "Failed to allocate threads.\n");
        ret = -1;
        goto exit;
    }

    // Initialize and set thread detached attribute
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    //Consumer threads
    int thrc;
    for (int i = 0; i < numt; i++) {
        thrc = pthread_create(consumerpool + i,
                              &attr,
                              consumer,
                              (void *)&threaddata);
        if (thrc) {
            fprintf(stderr, "ERROR: Thread creation.\n");
            ret = -1;
            goto exit;
        }
    }

    //Couple of sleeps to keep track of stuff while running
    sleep(1);

    //Reader thread
    thrc = pthread_create(&readerthread,
                          &attr,
                          reader,
                          (void *)&threaddata);
    if (thrc) {
        fprintf(stderr, "ERROR: Thread creation.\n");
        ret = -1;
        goto exit;
    }


    // Free attribute and wait for the other threads
    pthread_attr_destroy(&attr);

    int jrc;
    jrc = pthread_join(readerthread, NULL);
    if (jrc) {
        fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
    }
    for (int i = 0; i < numt; i++) {
        jrc = pthread_join(*(consumerpool + i), NULL);
        if (jrc) {
            fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
            ret = -1;
            goto exit;
        }
    }
    ret = 0;
    exit:
        threadtkill(&threaddata);
        free(consumerpool);
        fprintf(stderr, "Finished.\n");
        return(ret);
}


void *reader(void *readt)
{
    fprintf(stderr, "Reader thread started.\n");
    thread_t *threaddata = readt;
    int numt = threaddata->numt;
    int maxqueue = threaddata->maxqueue;
    int maxseqs = threaddata->maxseqs;
    FILE *fp = threaddata->fp;

    // Array of queues, one per consumer thread
    char ***queue = threaddata->queue;

    // Number of bytes used to store length of line
    size_t bytes = sizeof(ssize_t);
    // Tracks number of lines loaded so far
    size_t nlines = 0;

    // Tracks to which queue data should be added to
    int qtracker = 0;
    // Tracks to which position in any particular queue, data should be added
    int ltracker = 0;

    // Holds read line
    char *line = NULL;
    ssize_t linelength = 0;
    size_t n;

    // Tracks how much data will be read
    size_t totallength = 0;
    size_t totallines = 0;
    while ( (linelength =  getline(&line, &n, fp)) != -1 ) {
        // enough data is used to hold line contents + line length
        char *data = malloc(bytes + linelength + 1);

        if (!data) {
            fprintf(stderr, "memerr\n");
            continue;
        }
        // move line lenght bytes to data
        memcpy(data, &linelength, bytes);
        //move line bytes to data
        memcpy(data + bytes, line, linelength + 1);

        totallength += linelength;

        // Add newly allocated data to one of numt queues
        queue[qtracker][ltracker] = data;
        qtracker++;
        if (numt == qtracker) {
            // Loop around queue
            qtracker = 0;
            ltracker++;
            // Loop around positions in queue
            if (maxqueue == ltracker) ltracker = 0;
        }
        nlines++;
        // Stop reading thread and start consumer threads
        if (nlines == maxseqs) {
            fprintf(stderr, "%lu lines loaded\n", nlines);
            sleep(3);
            totallines += nlines;
            nlines = 0;
            fprintf(stderr, "Waking up consumers\n");
            pthread_mutex_lock(&(threaddata->mtx));
            //Wake consumer threads
            threaddata->gowork = 1;
            pthread_cond_broadcast(&(threaddata->workcond));
            //Wait for consumer threads to finish
            while ( !threaddata->goread ) {
                pthread_cond_wait(&(threaddata->readcond),
                                  &(threaddata->mtx));
            }
            fprintf(stderr, "Reader has awoken!!!!\n\n");
            sleep(3);
            threaddata->goread = 0;
            pthread_mutex_unlock(&(threaddata->mtx));
        }
    }

    //Add NULL pointers to the end of each queue to indicate reading is done
    pthread_mutex_lock(&(threaddata->mtx));
    for (int i = 0; i < numt; i++) {
        queue[i][ltracker] = NULL;
    }
    // Wake consumers for the last time
    threaddata->gowork = 1;
    pthread_cond_broadcast(&(threaddata->workcond));
    pthread_mutex_unlock(&(threaddata->mtx));

    // Log info
    fprintf(stderr, "%lu characters read.\n", totallength);
    if (line) free(line);
    pthread_exit(NULL);
}


void *consumer(void *consumert)
{
    thread_t *threaddata = consumert;
    // Number of consumer threads
    int numt = threaddata->numt;
    // Max length of queue to extract data from
    int maxqueue = threaddata->maxqueue;

    // Holds data sent by reader thread
    char *data = NULL;
    // Holds the actual line read
    char *line = NULL;
    size_t linelength;
    size_t bytes = sizeof(ssize_t);

    // get queue number for corresponding thread
    int qnum = -1;
    pthread_mutex_lock(&(threaddata->mtx));
    int *tlist = threaddata->threadidx;
    while (qnum == -1) {
        qnum = *tlist;
        *tlist = -1;
        tlist++;
    }
    fprintf(stderr, "Thread got queueID: %d.\n", qnum);
    pthread_mutex_unlock(&(threaddata->mtx));
    // Any thread works on only one and one queue only
    char **queue = threaddata->queue[qnum];

    //After initializing, wait for reader to start working
    pthread_mutex_lock(&(threaddata->mtx));
    while ( !threaddata->gowork) {
        pthread_cond_wait(&(threaddata->workcond), &(threaddata->mtx));
    }
    fprintf(stderr, "Consumer thread started queueID %d.\n", qnum);
    pthread_mutex_unlock(&(threaddata->mtx));

    // Tracks number of characters this thread consumes
    size_t totallength = 0;
    // Tracks from which position in queue data should be taken from
    size_t queuecounter = 1;
    // Get first data point
    data = queue[0];

    while (data != NULL) {
        //get line length
        memcpy(&linelength, data, bytes);

        //get line
        line = data + bytes;

        //Do work
        totallength += linelength;
        free(data);

        //Check for number of sequences analyzed
        if (queuecounter == maxqueue) {
            // Wait for other threads to catchup
            sleep(1);
            queuecounter = 0;
            pthread_mutex_lock(&(threaddata->mtx));
            threaddata->doneq++;
            threaddata->gowork = 0;
            // If this thread is the last one to be done with its queue, wake
            // reader
            if (threaddata->doneq == numt) {
                threaddata->goread = 1;
                pthread_cond_signal(&(threaddata->readcond));
                threaddata->doneq = 0;
            }
            // When done consuming data, wait for reader to load more
            while (!threaddata->gowork) {
                pthread_cond_wait(&(threaddata->workcond),
                                  &(threaddata->mtx));
            }
            pthread_mutex_unlock(&(threaddata->mtx));
        }
        //Get next line
        data = queue[queuecounter];
        queuecounter++;
    }

    // Log and exit
    fprintf(stderr, "\tThread %d analyzed %lu characters.\n", qnum, totallength);
    pthread_exit(NULL);
}


int  threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs)
{
    threaddata->fp = fp;
    //Determine maximum thread queue length
    threaddata->maxqueue = ceil((float)maxseqs/numt);
    threaddata->maxseqs = threaddata->maxqueue*numt;
    fprintf(stderr, "max lines to load: %d\n", threaddata->maxseqs);
    fprintf(stderr, "max lines per thread: %d\n", threaddata->maxqueue);
    threaddata->numt = numt;
    //Allocate data for queues and initilize them
    threaddata->queue = malloc(numt*sizeof(char *));
    threaddata->threadidx = malloc(numt*sizeof(int));
    for (int i = 0; i < numt; i++) {
        threaddata->queue[i] = malloc(threaddata->maxqueue*sizeof(char *));
        threaddata->threadidx[i] = i;
    }
    //Initialize syncronization data
    pthread_mutex_init(&(threaddata->mtx), NULL);
    pthread_cond_init(&(threaddata->workcond), NULL);
    pthread_cond_init(&(threaddata->readcond), NULL);
    threaddata->gowork = 0;
    threaddata->goread = 0;
    threaddata->doneq = 0;
    return 1;
}


void threadtkill(thread_t *threaddata)
{
    fclose(threaddata->fp);
    for (int i = 0; i < threaddata->numt; i++) {
        free(threaddata->queue[i]);
    }
    free(threaddata->queue);
    free(threaddata->threadidx);
    pthread_mutex_destroy(&(threaddata->mtx));
}

Ответы [ 3 ]

0 голосов
/ 23 апреля 2020

Обратите внимание, что в следующем разделе рассматриваются только фрагменты кода, которые вы называете потоками reader и consumer, хотя, как указано в комментариях и другом ответе, существуют другие потенциальные источники, которые следует проверить на наличие проблем. ..

В вашем reader thread:

while (loaddata(data) >= 0) {
    char *datapoint = malloc(data->legth); 
    ...
    // Note: there are no free(datapoint); calls in this loop
}

Очевидно, datapoint создается в этом блоке, но не освобождается в этом блоке.

Ниже приведены вероятные факторы, способствующие утечкам памяти:

  • Поскольку экземпляр datapoint в reader thread создается внутри блоков, его срок службы существует только внутри этих блоков. Память, которая была создана по этому адресу, продолжает принадлежать процессу, который ее создал, но вне этого блока переменная-указатель, указывающая на эту память, больше не существует, поэтому не может быть освобождена за пределами этого блока. И поскольку я не вижу вызовов free(datapopint) внутри этого блока, он никогда не освобождается.

  • Сложив это, char *datapoint = malloc(data->legth); вызывается за все время oop (без вызова free в промежутке), создавая новую память по новому адресу, перезаписывая адрес ссылается на своего предшественника, что делает невозможным освобождение всех предыдущих выделений.

  • Экземпляр datapoint в consumer thread, хотя имеет тот же символ, что и тот в reader thread не указывает на то же пространство памяти. Таким образом, хотя эта переменная освобождается , она не освобождает экземпляр datapoint, существующий в reader thread.

Отрывок кода из consumer thread

datapoint = dataqueue[datatracker]  //Note no ";" making this code uncompilable
                                    //forcing the conclusion that code posted
                                    //is not code actually used, 
                                    //Also: where is this instance of datapoint
                                    //created ?
datatracker++;
while (datapoint != NULL) {
    //Do work on data
    free(datapoint);
    datapoint = dataqueue[datatracker];
    datatracker++;

    //More synchronization code
}

По вопросам в комментариях и общей Linux информации о потоках:
Почему Valgrind не обнаруживает утечки памяти, ТАК вопрос
передача данных между потоками вопрос
Создание потоков в Linux учебник
LinuxTtutorial: POSIX Threads

0 голосов
/ 28 апреля 2020

Оказывается, нет ничего плохого в моем коде как таковом. Вызов free () после того, как mallo c () освобождает память в куче для повторного использования программой , но это не означает, что она возвращается к система . Причина этого все еще немного не понятна.

Valgrind не сообщал об утечках памяти, потому что их нет.

После исследования купола, читая больше о природе динамич c выделение и посадка памяти здесь:

Принудительное освобождение () для возврата памяти mallo c обратно в ОС

Почему функция free () не работает вернуть память операционной системе?

Будут ли реализации Mallo c возвращать свободную память обратно в систему?

Память не освобождена после вызова free ()

Вызова malloc_trim () после каждого освобождения было достаточно, чтобы система вернула выделенную память.

Например, без вызова malloc_trim (), ЦП и памяти использование моей программы выглядит следующим образом: enter image description here При каждом вызове моего потока «ридера» (первый пик использования ЦП) выделяется некоторая память. Вызов mu «потребительских» потоков освобождает запрошенную память, но память не всегда возвращается в систему согласно синей линии на графике.

С помощью malloc_trim () после каждого free () использование памяти выглядит так, как я ожидал, что это будет выглядеть: enter image description here Когда поток «читателя» выполняет память, связанную с porcess увеличивается. Когда «потребители» работают, память освобождается и возвращается в ОС.

0 голосов
/ 23 апреля 2020

Эта строка выглядит подозрительно:

if (length == ltracker) ltracker++;

Обычно я ожидаю увидеть:

if (length == ltracker) ltracker = 0; /* wrap */

, но без всего контекста это немного нечетко. Кроме того, кажется довольно ясным, что вы создаете гонку между производителем и потребителем со всем этим, что может оказаться даже более трудным для отладки, чем ваша текущая проблема.

Поскольку вы прошли тройной уровень; вы понимаете, что ваше буферное пространство O (n ^ 3); и что free () редко сжимает память вашего процесса. Как правило, Free просто позволяет перерабатывать ранее выделенную кучу; поэтому ваша программа будет расти до тех пор, пока ей больше не нужно будет запрашивать у системы больше памяти, а затем оставаться в этом размере.

...