Невозможно получить структуру, отправленную через очередь сообщений POSIX - PullRequest
0 голосов
/ 03 ноября 2019

У меня есть два процесса, которые взаимодействуют с очередями сообщений. Каждый процесс - это отдельный файл. Похоже, что отправитель успешно отправил сообщение в очередь сообщений. К сожалению, процесс получения блокирует и не наблюдает за сообщениями, отправленными через очередь сообщений posix.

Стороннее примечание: я успешно реализовал это с помощью строк (sprintf для создания сообщения в отправителе при использовании sscanf дляразобрать полученное сообщение). Таким образом, я могу подтвердить, что настройка MQ правильная.

С учетом сказанного я следовал в этом посте , чтобы изменить мою реализацию очереди строк сообщений и адаптироваться к структурам.

Вот структура и некоторая информация, которую я хотел бы пройти через очередь сообщений.

typedef struct msg_buffer{
    long mtype;                
    struct msg_info{
        char* shm_name;
        size_t shm_size;
        char* path;
    }msg_info;
}msg_buffer;

В обоих процессах очередь сообщений открыта.

struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = MQ_MAX_MSG;  // 10
attr.mq_msgsize = BUFSIZE;    // 1200
attr.mq_curmsgs = 0;

// sender and receiver both open mq like below
mq = mq_open(MQ_NAME, O_RDWR | O_CREAT, 0666, &attr);

Вот как отправительсоздает структуру и отправляет ее через MQ.

ssize_t handle_with_cache(gfcontext_t *ctx, const char *path, void* arg){
    // Note: char* path sent looks like "/to/some/path.txt";
    //       char* shm_name_buffer[20] = "shm_name_00";

    char* path_copy = malloc(strlen(path) + 1);
    char* shm_name_copy = malloc(strlen(shm_name_buffer) + 1);
    // TODO: fix memory leaks by these char* being copied
    strcpy(path_copy, path);
    strcpy(shm_name_copy, shm_name_buffer);

    struct msg_buffer message = {2, { path_copy, segment_size, shm_name_copy} };

    if(mq_send(mq,(const char*) &message, sizeof(struct msg_info), 0) < 0){
        fprintf(stderr, "Error mq_send: %s.\n", strerror(errno));
    }

    fprintf(stdout, "Send message length is %ld.\n", sizeof(message));
    fprintf(stdout, "path:%s\n", message.msg_info.path);
    fprintf(stdout, "shm_name:%s\n", message.msg_info.shm_name);
    fprintf(stdout, "shm_size:%ld\n", message.msg_info.shm_size);

    mq_getattr(mq, &setAttr);
    fprintf(stdout, "Msgs In Queue: %lu\n", setAttr.mq_curmsgs);
    fprintf(stdout, "Msg Size: %lu\n", setAttr.mq_msgsize);

    return bytes_transferred;
}

Это получатель, он будет прослушивать и ждать сообщения в очереди и попытаться проанализировать информацию:

void *threadCacheProcess(void *thread_id){
    int recMessageSize;
    msg_buffer message;

    while(1){
        // recv message sent from proxy and parse
        recMessageSize = mq_receive(mq, (char *) &message, sizeof(struct msg_info), NULL);

        // block until we get valid message
        if(recMessageSize != -1){
            fprintf(stdout, "Success!\n");
            fprintf(stdout, "T%d Name:%s\n",id, message.msg_info.shm_name);
            fprintf(stdout, "T%d Size:%lu\n",id, message.msg_info.shm_size);
            fprintf(stdout, "T%d path:%s\n",id, message.msg_info.path);
        }
    }
}

Выходы консоли:

Консоль отправителя:

path:/test/path/to/file.txt
shm_name:/shm_name_0
shm_size:8192
Msgs In Queue: 1
Msg Size: 1219

Консоль получателя:

Thread Running...   // No "Success or print message" that was expected. Why?

РЕДАКТИРОВАТЬ: Ive обновил код, так что еслиследует руководству Биджа с вложенным примером. К сожалению, проблема получения все еще сохраняется.

1 Ответ

0 голосов
/ 03 ноября 2019

Посмотрите на определение вашего буфера сообщений. Вы определяете содержание вашего сообщения (msg_info), чтобы оно содержало два указателя , имя_шлема и путь. Эти указатели указывают на память в пространстве процесса отправителя и не являются частью адресного пространства получателя .

typedef struct msg_buffer{
    long mtype;                
    struct msg_info{
        char* shm_name;
        size_t shm_size;
        char* path;
    }msg_info;
}msg_buffer;

Я предлагаю прочитать о Sun-RPC и XDR - вам нужно сериализовать данные в отправителе и десериализовать данные в получателе. Читайте о сериализации или сортировке данных.

Как исправить? Определите максимальное пространство для вашего shm_name и пути и переопределите ваше сообщение, чтобы оно содержало их.

#define MAX_SHM_NAME (32) //pick appropriate values...
#define MAX_PATH (128)
typedef struct msg_buffer{
    long mtype;                
    struct msg_info{
        size_t shm_size;
        char shm_name[MAX_SHM_NAME];
        char path[MAX_PATH];
    }msg_info;
}msg_buffer;

Затем скопируйте значения shm_name и path в функцию сериализации сообщений,

ssize_t handle_with_cache(gfcontext_t *ctx, const char *path, void* arg){
// Note: char* path sent looks like "/to/some/path.txt";
//       char* shm_name_buffer[20] = "shm_name_00";

/* why malloc? malloc allocs memory in local process address space, the other process cannot see it...
char* path_copy = malloc(strlen(path) + 1);
char* shm_name_copy = malloc(strlen(shm_name_buffer) + 1);
// TODO: fix memory leaks by these char* being copied
strcpy(path_copy, path);
strcpy(shm_name_copy, shm_name_buffer);
*/

struct msg_buffer message;
message.mtype = 2;
message.msg_info.thing = segment_size;
//you could omit the null-terminators, since you have defined max length
strncpy(message.msg_info.shm_name, shm_name_buffer, MAX_SHM_NAME-1);
    message.msg_info.shm_name[MAX_SHM_NAME-1] = '\0';
strncpy(message.msg_info.path, path, MAX_PATH-1);
    message.msg_info.path[MAX_PATH-1] = '\0';

//wrong - gotta send whole message...
//if(mq_send(mq,(const char*) &message, sizeof(struct msg_info), 0) < 0)
if(mq_send(mq,(const char*) &message, sizeof(struct msg_buffer), 0) < 0){
    fprintf(stderr, "Error mq_send: %s.\n", strerror(errno));
}

fprintf(stdout, "Send message length is %ld.\n", sizeof(message));
fprintf(stdout, "path:%s\n", message.msg_info.path);
fprintf(stdout, "shm_name:%s\n", message.msg_info.shm_name);
fprintf(stdout, "shm_size:%ld\n", message.msg_info.shm_size);

mq_getattr(mq, &setAttr);
fprintf(stdout, "Msgs In Queue: %lu\n", setAttr.mq_curmsgs);
fprintf(stdout, "Msg Size: %lu\n", setAttr.mq_msgsize);

return bytes_transferred;
}

ВашПолучатель должен извлечь данные таким же образом. Наилучшим подходом будет извлечение данных в локальный буфер.

void *threadCacheProcess(void *thread_id){
ssize_t recMessageSize;
msg_buffer message;
unsigned int pri=0; //message priority

while(1){
    // recv message sent from proxy and parse
    recMessageSize = mq_receive(mq, (char *) &message, sizeof(struct msg_buffer), &pri);

    if( recMessageSize < 0 ) {
        int bad = errno;
        fprintf(stdout, "error: %d:\n", bad); perror(bad);
    }
    // block until we get valid message
    else {
        fprintf(stdout, "size %ld, priority %d\n", recMessageSize,pri);
        fprintf(stdout, "T%d Name:%s\n",id, message.msg_info.shm_name);
        fprintf(stdout, "T%d Size:%lu\n",id, message.msg_info.shm_size);
        fprintf(stdout, "T%d path:%s\n",id, message.msg_info.path);
    }
}
}
...