Потоки не могут получать блокировки, даже если их никто не использует - PullRequest
0 голосов
/ 05 января 2019

Я пытаюсь написать код, использующий потоки, чтобы найти файлы, содержащие определенную строку в своем имени.

Мой код работает большую часть времени. в некоторых особых случаях потоки по какой-то причине не могут получить блокировки.

Я очень старался отладить (используя распечатки, как вы можете видеть в коде), но не смог найти проблему.

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>
#include <dirent.h>
#include <string.h>

void* threadFunction(void* searchTerm);
bool scanDirName(char * path, char * searchTerm);
int numOfThreads;
pthread_mutex_t qlock;
pthread_cond_t cond;
int count;
int matchingFiles;

struct Node {
    char* data;
    struct Node* next;
};

// Two global variables to store an address of front and rear nodes.
    struct Node* front = NULL;
    struct Node* rear = NULL;

// To Enqueue an integer
void Enqueue(char* x) {
/*    printf("\nhere\n");*/
    struct Node* temp = (struct Node*)malloc(sizeof(struct Node));
    temp->data =x;
    temp->next = NULL;
    if(front == NULL && rear == NULL){
        front = rear = temp;
        pthread_cond_signal(&cond);
        return;
    }
    rear->next = temp;
    rear = temp;

}

// To Dequeue an integer.
char* Dequeue() {
    struct Node* temp = front;
    if(front == NULL) {
        return NULL;
    }
    char* data;
    data = front->data;
    if(front == rear) {
        front = rear = NULL;

    }
    else {
        front = front->next;
    }
//    printf("\nfreeing %p, %s\n", temp, temp->data);
    free(temp);
    return data;
}

void* threadFunction(void* st) {
    bool isFinished;
    isFinished = false;
    pthread_mutex_lock(&qlock);
    while (true) {
        char *filepath;
        char *searchTerm;
        searchTerm = (char *) st;

        filepath = Dequeue();
        pthread_mutex_unlock(&qlock);
        if (filepath == NULL) {
            printf("%ld waiting for lock \n",(long) pthread_self());
            pthread_mutex_lock(&qlock);
            count++;
            if (isFinished) {
                printf("%ld waking u up, we found %d items!\n",(long) pthread_self(), matchingFiles);
                pthread_cond_broadcast(&cond);
                if (count == numOfThreads) {
                    printf("Thread exited: %ld\n", (long) pthread_self());
                    pthread_mutex_unlock(&qlock);
                    pthread_exit((void*)0);
                }
            }
            isFinished = false;

            printf("%ld going to sleep\n",(long) pthread_self());
            pthread_cond_wait(&cond, &qlock);
            printf("%ld Woke up, try to compare %d == %d\n",(long) pthread_self(), count, numOfThreads);
            if (count == numOfThreads) {
                printf("Thread exited: %ld\n", (long) pthread_self());
                pthread_mutex_unlock(&qlock);
                pthread_exit((void*)1);
            }
            printf("%ld compare failed \n",(long) pthread_self());
            count--;

        }
        else {
            printf("%ld deq 1 item \n",(long) pthread_self());
            isFinished = scanDirName(filepath, searchTerm);
        }
    }



}

bool scanDirName(char * path, char * searchTerm){
    DIR * d = opendir(path); // open the path
    char* str3;

    if(d==NULL) return false; // if was not able return

;

    struct dirent * dir; // for the directory entries
    while ((dir = readdir(d)) != NULL) // if we were able to read somehting from the directory
    {
        printf("%ld STARTED A ROUND!\n",(long) pthread_self());
        if(dir-> d_type == DT_DIR){ //
            if (dir->d_type == DT_DIR && strcmp(dir->d_name, ".") != 0 & strcmp(dir->d_name, "..") != 0) // if it is a directory
            {
                str3 = malloc((1+strlen("/")+ strlen(path)+ strlen(dir->d_name))*sizeof(char));
                if (!str3){
                    return false;
                }

                strcpy(str3, path);
                strcat(str3, "/");
                strcat(str3, dir->d_name);
//                printf("\n---\n%s\n---\n",str3);
                printf("%ld waiting for lock in func \n",(long) pthread_self());
                pthread_mutex_lock(&qlock);
                printf("%ld wake threads \n",(long) pthread_self());
                Enqueue(str3);
                pthread_cond_signal(&cond);
                printf("%ld enq \n",(long) pthread_self());
                pthread_mutex_unlock(&qlock);
                printf("%ld locks gone \n",(long) pthread_self());

            }
        }
        else if(dir-> d_type == DT_REG){ //
            if(strstr(dir->d_name, searchTerm)){
                matchingFiles++;
                /*printf("%s/%s\n", path, dir->d_name);*/
            }
        }

        printf("%ld finished A ROUND!\n",(long) pthread_self());
    }
    printf("%ld finished scanning!\n",(long) pthread_self());
    fflush(stdout);
    closedir(d); // finally close the directory
    if (count == numOfThreads-1) {
        return true;
    }
    return false;
}

int main(int argc, char* argv[]){
    count = 0;
    pthread_mutex_init(&qlock, NULL);
    pthread_cond_init(&cond, NULL);
    matchingFiles = 0;

    if (argc != 4){
        printf("ERROR\n");
        exit(1);
    }
    char* rootSearchDir = argv[1];
    char* searchTerm = argv[2];
    int threadsNumber = atoi(argv[3]);

    pthread_t threadsCollection[threadsNumber];

    Enqueue(rootSearchDir);
    numOfThreads = threadsNumber;

    int i;
    for (i=0; i<threadsNumber; i++){
        if(pthread_create(&threadsCollection[i], NULL, threadFunction, (void*)searchTerm)) {

            fprintf(stderr, "Error creating thread\n");
            pthread_mutex_destroy(&qlock);
            return 1;

        }
    }

    int rc;

    for (i=0; i<threadsNumber; i++){
        rc = pthread_join((threadsCollection[i]), NULL);
        if(rc) {
            fprintf(stderr, "Error joining thread, %d\n", rc);
            pthread_mutex_destroy(&qlock);
            return 1;

        }
    }



}

Я прикрепил весь код (возможно, это важно), но я подозреваю, что проблема в scanDirName или threadFunction.

--- РЕДАКТИРОВАТЬ ---

Я нашел проблему. У меня была логическая проблема с замками, я ее исправил. Спасибо всем!

1 Ответ

0 голосов
/ 05 января 2019
  1. если filepath = Dequeue () возвращает ненулевое значение; тогда ваша следующая итерация этого цикла вызовет Dequeue () без удержания & qlock. Если вы проверили возвращаемые значения, вы заметили бы это.

  2. scanDirName полагается на счетчик, чтобы установить его возвращаемое значение; однако он не защищен qlock {см. # 1}

Если я правильно прочитал ваш код, вы пытаетесь создать своего рода механизм диспетчеризации, в котором имена выводятся из scanDir () в очередь и извлекаются из очереди, где для каждого из них запускается новый scanDir (). вещь. Обработка счета - это попытка определить, произошла ли ошибка в очереди, потому что ничего не осталось сделать, или все заняты выполнением scanDirs.

Механизм немного проще: использовать две очереди. Scandir сбрасывает записи в скажем «NameQ», а функция потока извлекает имена из NameQ, сбрасывая их в WorkQ. Когда у потока заканчиваются элементы NameQ, он ищет в WorkQ новые, которые можно отправить. Тогда вам просто нужно отслеживать, сколько потоков в scanDir (). Если обе очереди пусты, а число потоков в scanDir равно нулю, делать больше нечего. Ваша основная логика выглядит так:

void* threadFunction(void* st) {
    static int nreader = 0;
    int ncount = 0, wcount = 0;
    char *searchTerm;
    searchTerm = (char *) st;
    Menter(&qlock);
    while (nreader || !(empty(&Names) && empty(&Work))) {
        char *filepath;
        filepath = Dequeue(&Names);
        if (filepath) {
            ncount++;
            Enqueue(&Work, filepath);
        } else if ((filepath = Dequeue(&Work)) != NULL) {
            wcount++;
            nreader++;
            Mexit(&qlock);
            scanDirName(filepath, searchTerm);
            Menter(&qlock);
            nreader--;
            cv_wake(&cond);
        } else {
            cv_wait(&cond, &qlock);
        }
    }
    Mexit(&qlock);
    printf("%p: %d, %d items\n", pthread_self(), ncount, wcount);
    return NULL;
}

Mexit, Menter - это обложки для проверки ошибок для приколов posix, и намного проще для глаз, чем вся эта избыточная печать ...

static int menter(pthread_mutex_t *m, int line) {
    if (pthread_mutex_lock(m) != 0) {
        fprintf(stderr, "%p:%d  Mutex lock failed!\n", pthread_self(),line);
        abort();
    }
    DEBUG_LOCK("%d locked\n", line);
    return 0;
}
#define Menter(x) menter((x), __LINE__)
static int mexit(pthread_mutex_t *m, int line) {
    DEBUG_LOCK("%d unlocked\n", line);
    if (pthread_mutex_unlock(m) != 0) {
        fprintf(stderr, "%p:%d  Mutex unlock failed!\n", pthread_self(),line);
        abort();
    }
    return 0;
}
#define Mexit(x) mexit((x), __LINE__)
...