Очередь сообщений: ошибка сегментации при выполнении задачи - PullRequest
0 голосов
/ 08 мая 2018

Кажется, что не может быть никакого прогресса в этом назначении C, надеюсь, кто-то сразу обнаружит проблему: цель состоит в том, чтобы реализовать пул потоков с использованием очереди сообщений POSIX. Потоки создаются при запуске, а затем получают задачу за задачей из очереди сообщений. Ожидается, что потоки получат 16-байтовую структуру типа task_t , которая содержит функцию, а также указатель на аргументы, с которыми она должна выполняться.

Я получаю ошибку сегментации, как только первая задача обрабатывается потоком, т. Е. При выполнении функции foo , доступ к которой в threadPoolFun возможен как task_ptr-> func и предполагается запустить с аргументом task_ptr-> this :

void * threadPoolFun(void * arg) {
    task_t * task_ptr;
    int size = attr.mq_msgsize; //size = 16
    char buf[size];
    while (1) {
        int ret_val = mq_receive(task_queue, buf, size, NULL);
        if (ret_val == -1) {
            perror("Error receiving message: ");
        }
        task_ptr = (task_t *) buf;
        if (task_ptr->func == NULL) {
            break;
        }
        task_ptr->func(task_ptr->this);
    }
    return NULL;
}

Ошибка сегментации:

Program terminated with signal SIGSEGV, Segmentation fault.
#0  0x000000000040086a in foo (arg=0x7ffdd71fdfd0) at all_in_one.c:37
37        td->retVal = (double) td->tid;
[Current thread is 1 (Thread 0x7fec2d544700 (LWP 9853))]
(gdb) list
32    fooTask_t; // type for this tasks
33    
34    //definition of task: write tid to retVal
35    void foo(void * arg) {
36        fooTask_t *td = (fooTask_t *) arg;
37        td->retVal = (double) td->tid;
38    }
39    
40    // initialize task 
41    void fooInit(fooTask_t * t) {
(gdb) backtrace
#0  0x000000000040086a in foo (arg=0x7ffdd71fdfd0) at all_in_one.c:37
#1  0x0000000000400b8c in threadPoolFun (arg=0x0) at all_in_one.c:118
#2  0x00007fec2f1196ba in start_thread (arg=0x7fec2d544700)
    at pthread_create.c:333
#3  0x00007fec2ee4f41d in clone ()

Полный код:

#include <mqueue.h>
#include <pthread.h>
#include <stdio.h>

//number of threads
#define NTH 4
//number of tasks
#define NTASKS 10

void * threadPoolFun(void * arg);
typedef void( * taskfun_t)(void * );

//minimal task data structure: func to be executed with args this, abstraction from actual task
typedef struct minTaskDataStruct {
    void * this;
    taskfun_t func;
}
task_t;

//data structure for actual task
typedef struct fooDataStruct {
    // mandatory entries
    void * this; // pointer to this structure
    taskfun_t func; // function with signature: void foo(void *)

    // data for individual task instances
    long tid; // task id                 
    double retVal; // return value

}
fooTask_t; // type for this tasks

//definition of task: write tid to retVal
void foo(void * arg) {
    fooTask_t *td = (fooTask_t *) arg;
    td->retVal = (double) td->tid;
}

// initialize task 
void fooInit(fooTask_t * t) {
    t-> this = t; // set this pointer
    t-> func = foo; // set task function
}

//data structure for threads
pthread_t th[NTH];
//data structure for task queue attributes
static struct mq_attr attr;
//task queue
mqd_t task_queue;
//create task structs and store them in array td
fooTask_t td[NTASKS];

int main() {


    printf("Setting up tasks\n");
    for (int i = 0; i < NTASKS; i++) {
        fooTask_t task;
        task.tid = i;
        fooInit(&task);
        td[i] = task;
    }

    // set attributes
    attr.mq_flags = 0; /* Flags: 0 or O_NONBLOCK */
    attr.mq_maxmsg = 10; /* Max. # of messages on queue */
    attr.mq_msgsize = 16; /* Max. message size (bytes) */
    printf("Opening task queue\n");
    // set up task queue
    task_queue = mq_open("/my_task_queue_mq", O_CREAT | O_RDWR, 0700, & attr);

    //create threads with default attributes, pass threadpool function
    //threads will run as long as func passed to them is not NULL

    for (long i = 0; i < NTH; i++) {
    printf("Creating thread %ld\n", i);
        pthread_create( & th[i], NULL, threadPoolFun, NULL);
    }
    //send tasks to queue, tasks to be consumed by threads, only send first 16 bytes
    for (int i = 0; i < NTASKS; i++) {
    printf("Sending task %d\n", i); 
        mq_send(task_queue, (const char * ) &td[i], sizeof(task_t), 0);
    }

    //send dummy tasks with func==NULL to terminate threads
    for (int i = 0; i < NTASKS; i++) {
    printf("Sending dummy task %d\n", i); 
        task_t dummy_task;
        dummy_task.this = & dummy_task;
        dummy_task.func = NULL;
        mq_send(task_queue, (const char * ) & dummy_task, sizeof(task_t), 0);
    }

    //verify task execution
    int sum1 = 0;
    int sum2 = 0;
    for (int i = 0; i < NTASKS; i++) {
        sum1 += td[i].retVal;
        sum2 += i;
    }

    if (sum1 == sum2) {
        printf("Success: Sum1 %d equals Sum2 %d", sum1, sum2);
    } else {
        printf("Fail: Sum1 %d does not Sum2 %d", sum1, sum2);
    }
    return 0;
}

//threadPoolFun function definition
void * threadPoolFun(void * arg) {
    task_t * task_ptr;
    int size = attr.mq_msgsize; //size = 16
    char buf[size];
    while (1) {
        int ret_val = mq_receive(task_queue, buf, size, NULL);
        if (ret_val == -1) {
            perror("Error receiving message: ");
        }
        task_ptr = (task_t *) buf;
        if (task_ptr->func == NULL) {
            break;
        }
        task_ptr->func(task_ptr->this);
    }
    return NULL;
}

Любая помощь приветствуется, большое спасибо уже заранее! Лоуренс

1 Ответ

0 голосов
/ 08 мая 2018

Проблема в том, что вы используете адреса для объектов в стеке после того, как их больше нет. У вас есть это:

for (int i = 0; i < NTASKS; i++) {
    fooTask_t task;
    task.tid = i;
    fooInit(&task);
    td[i] = task;
}

&task такой указатель. Это легко исправить, так как у вас уже есть место, где вы храните задачу. Вот лучший код:

for (int i = 0; i < NTASKS; i++) {
    fooTask_t task;
    task.tid = i;
    td[i] = task;
    fooInit(&td[i]);
}

Второе место, где это происходит, здесь:

for (int i = 0; i < NTASKS; i++) {
printf("Sending dummy task %d\n", i); 
    task_t dummy_task;
    dummy_task.this = & dummy_task;
    dummy_task.func = NULL;
    mq_send(task_queue, (const char * ) & dummy_task, sizeof(task_t), 0);
}

Поскольку фиктивные задачи не нужно нигде сохранять отдельно, вы можете убрать объявление dummy_task, чтобы оно сохранялось в течение всего выполнения main:

task_t dummy_task;
dummy_task.this = & dummy_task;
dummy_task.func = NULL;

//send dummy tasks with func==NULL to terminate threads
for (int i = 0; i < NTASKS; i++) {
fprintf(stderr, "Sending dummy task %d\n", i); 
    mq_send(task_queue, (const char * ) & dummy_task, sizeof(task_t), 0);
}

После этого все еще есть проблемы с синхронизацией в вашем алгоритме, но это отдельная проблема, которую я оставляю вам.

Я изменил печать с stdout на stderr, так как первый из них буферизован и потерян при сбое программы.

...