pthread_cond_broadcast () SIGSEGV в межпроцессной синхронизации - PullRequest
0 голосов
/ 16 декабря 2018

Существуют две программы Producer и Consumer.

Я использую общую память, поэтому очередь видна для обеих программ.Очередь состоит из мьютекса, двух условных переменных, количества элементов в буфере и буфере.

Таким образом, первая программа выделяет буфер и мьютекс, читает одно за другим число из файла и записывает в буфер.Если буфер пуст, то Потребитель должен пассивно ждать, пока Поставщик введет туда хотя бы один номер.

Если буфер заполнен, то поставщик должен пассивно ждать, пока потребитель не извлечет хотя быодин номер.

Поставщик заканчивает свою работу, как только он читает последний символ из файла и проверяет, что Потребитель прочитал его.

Потребитель заканчивает свою работу, когда буфер пусти завершение работы Поставщика.


Итак, проблема в том, что время от времени сигнал SIGSEGV выбрасывается в pthread_cond_broadcast(&queue->notEmpty);.Valgrind выводит InvalidRead в pthread_cond_broadcast.

Может кто-нибудь объяснить, в чем проблема в коде?

Первая программа (Производитель):

#include <iostream>
#include <fstream>
#include <sys/shm.h>
#include <sys/ipc.h>
#include <sys/types.h>
#include <sys/sem.h>
#include <pthread.h>

using namespace std;

#define ARRAYSIZE 5

typedef struct {
    int buf[ARRAYSIZE];
    pthread_mutex_t mutex;
    pthread_cond_t notFull, notEmpty;
    int count;
} Queue;

Queue *queue;
int shmQueueId;
int putIndex = 0;

void deleteIds() {
    cout << "delete all ids" << endl;
    pthread_mutex_destroy(&queue->mutex);
    pthread_cond_destroy(&queue->notFull);
    pthread_cond_destroy(&queue->notEmpty);
    shmdt(queue);
    shmctl(shmQueueId, IPC_RMID, nullptr);
    exit(1);
}

Queue queueInit() {
    Queue q;

    for (int i = 0; i < ARRAYSIZE; i++) q.buf[i] = 0;
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
    pthread_mutex_init(&q.mutex, &attr);
    pthread_condattr_t attrcond;
    pthread_condattr_init(&attrcond);
    pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED);
    pthread_cond_init(&q.notFull, &attrcond);
    pthread_condattr_t attrcond1;
    pthread_condattr_init(&attrcond1);
    pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED);
    pthread_cond_init(&q.notEmpty, &attrcond1);
    q.count = 0;

    return q;
}

void queueInit(Queue *q) {
    for (int i = 0; i < ARRAYSIZE; i++) q->buf[i] = 0;
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
    pthread_mutex_init(&q->mutex, &attr);
    pthread_condattr_t attrcond;
    pthread_condattr_init(&attrcond);
    pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED);
    pthread_cond_init(&q->notFull, &attrcond);
    pthread_condattr_t attrcond1;
    pthread_condattr_init(&attrcond1);
    pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED);
    pthread_cond_init(&q->notEmpty, &attrcond1);
    q->count = 0;
}

void shareQueue() {
    //Queue q = queueInit();

    if ((shmQueueId = shmget(111, sizeof(Queue), 0666 | IPC_CREAT)) == -1) {
        deleteIds();
    }
    cout << "shmQueueId" << shmQueueId << endl;

    if ((queue = (Queue *) shmat(shmQueueId, 0, 0)) == (Queue *) -1) {
        deleteIds();
    }

    // *queue = q;
    queueInit(queue);
}

void enqueue(string line) {
    int num = stoi(line);
    queue->buf[putIndex] = num;
    if (++putIndex == ARRAYSIZE) putIndex = 0;
    cout << num << endl;
    (queue->count)++;
    //pthread_cond_signal(&queue->notEmpty);
    pthread_cond_broadcast(&queue->notEmpty);
}

void exit() {
    int exitId;
    bool *exitAddr;
    if ((exitId = shmget(20, sizeof(bool), 0666 | IPC_CREAT)) == -1) {
        deleteIds();
    }

    if ((exitAddr = (bool *) shmat(exitId, 0, 0)) == (bool *) -1) {
        deleteIds();
    }

    exitAddr[0] = true;

    if ((shmdt(exitAddr)) == -1) {
        shmctl(exitId, IPC_RMID, nullptr);
        deleteIds();
    }
}

int main() {
    string inFileName = "inFile.txt";
    ifstream inFile(inFileName);
    string line;

    shareQueue();   // ИНИЦИАЛИЗАЦИЯ

    for (; getline(inFile, line);) {
        pthread_mutex_lock(&queue->mutex);
        while (queue->count == ARRAYSIZE) {
            cout << "producer: queue FULL" << endl;
            pthread_cond_wait(&queue->notFull, &queue->mutex);
        }
        enqueue(line);
        pthread_mutex_unlock(&queue->mutex);
    }

    while (queue->count != 0);

    exit();

    return 0;
}

Вторая программа:

#include <iostream>
#include <fstream>
#include <sys/shm.h>
#include <sys/ipc.h>
#include <sys/types.h>
#include <sys/sem.h>
#include <signal.h>
#include <pthread.h>

using namespace std;

#define ARRAYSIZE 5
#define STACK 1024*64

struct Queue {
    int buf[ARRAYSIZE];
    pthread_mutex_t mutex;
    pthread_cond_t notFull, notEmpty;
    int count;
};

Queue *queue;
int shmQueueId;
int exitId;
bool *exitAddr;
int takeIndex = 0;

void deleteIds() {
    cout << "delete all ids" << endl;
    shmdt(exitAddr);
    shmctl(exitId, IPC_RMID, nullptr);
    shmdt(queue);
    shmctl(shmQueueId, IPC_RMID, nullptr);
    exit(1);
}

void shareQueue() {
    if ((shmQueueId = shmget(111, sizeof(Queue), 0666 | IPC_CREAT)) == -1) {
        deleteIds();
    }
    cout << "shmQueueId" << shmQueueId << endl;

    if ((queue = (Queue *) shmat(shmQueueId, 0, 0)) == (Queue *) -1) {
        deleteIds();
    }
}

void dequeue() {
    int num = queue->buf[takeIndex];
    queue->buf[takeIndex] = 0;
    if (++takeIndex == ARRAYSIZE) takeIndex = 0;
    cout << num << endl;
    (queue->count)--;
    //int b = pthread_cond_signal(&queue->notFull);
    int b = pthread_cond_broadcast(&queue->notFull);
}

void exitInit() {
    if ((exitId = shmget(20, sizeof(bool), 0666 | IPC_CREAT)) == -1) {
        deleteIds();
    }

    if ((exitAddr = (bool *) shmat(exitId, 0, 0)) == (bool *) -1) {
        deleteIds();
    }
}

bool isSupplierExit() {
    return *exitAddr;
}

void *threadJob(void *arg) {
    while (!(queue->count == 0 && isSupplierExit()));

    deleteIds();

    return 0;
}

int main() {
    //void *stack = malloc(STACK);
    pthread_t p;
    string line;

    exitInit();     //
    shareQueue();   // ИНИЦИАЛИЗАЦИЯ

    /*clone(&threadJob, (char *) stack + STACK,
          SIGCHLD | CLONE_FS | CLONE_FILES | CLONE_SIGHAND | CLONE_VM, nullptr);*/

    pthread_create(&p, NULL, threadJob, NULL);

    //TODO заменить clone на pthread_create

    for (;;) {
        int a1 = pthread_mutex_lock(&queue->mutex);
        while (queue->count == 0) {
            cout << "consumer: queue EMPTY" << endl;
            pthread_cond_wait(&queue->notEmpty, &queue->mutex);
        }
        dequeue();
        int a = pthread_mutex_unlock(&queue->mutex);
    }
}
...