Какой примитив синхронизации я должен использовать здесь? - PullRequest
0 голосов
/ 02 мая 2020
while(1) {
     char message_buffer[SIZE];
     ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);

     if(message_len == -1) { /* error handling... */}

     pthread_t pt1;
     int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
     if(ret) { /* error handling ... */}
}

void * handle_message (void * message) {
    puts((char *) message);
    return NULL;
}

Приведенный выше пример не является MRE, но он чрезвычайно прост:

У меня есть основной поток с al oop, который постоянно получает сообщения из очереди сообщений. После получения нового сообщения оно сохраняется в локальном буфере message_buffer. Затем создается новый поток, чтобы «позаботиться» о новом сообщении, и, таким образом, адрес буфера сообщений передается в handle_message, который впоследствии выполняет новый поток.


Проблема

Часто 2 потока будут печатать одно и то же сообщение, хотя я могу со 100% уверенностью убедиться, что сообщения в очереди не были одинаковыми.


Я не совсем уверен, но мне кажется, я понимаю, почему это происходит:

говорит, что я посылаю sh 2 разных сообщения в mqueue, и только тогда я начинаю их потреблять.

На первой итерации while l oop сообщение будет получено из очереди и сохранено в message_buffer. Будет создан новый поток, и ему будет передан адрес message_length. Но этот поток может быть недостаточно быстрым, чтобы вывести содержимое буфера в поток до того, как будет использовано следующее сообщение (на следующей итерации l oop), а затем содержимое message_buffer будет переопределено , Таким образом, первый и второй поток теперь печатают одно и то же значение.


Мой вопрос: каков наиболее эффективный способ решить эту проблему? Я довольно плохо знаком с параллельным программированием и многопоточностью / pthreads, и меня немного ошеломляют различные примитивы синхронизации.

Проблемы с мьютексом

static pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;

while(1) {
     char message_buffer[SIZE];
     pthread_mutex_lock(&m);
     ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
     pthred_mutex_unlock(&m);

     if(message_len == -1) { /* error handling... */}

     pthread_t pt1;
     int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
     if(ret) { /* error handling ... */}
}

void * handle_message (void * message) {
    char own_buffer[SIZE];
    pthread_mutex_lock(&m);
    strncpy(own_buffer, (char *) message, SIZE);
    pthread_mutex_unlock(&m);
    puts(own_buffer);
    return NULL;
}

Я не думаю, что моя текущая реализация мьютекса верно, поскольку потоки все еще получают повторяющиеся сообщения. Основной поток может заблокировать мьютекс, получить сообщение в буфер, разблокировать мьютекс, порождать поток, но этот поток все еще может зависать, и основной может просто переписать буфер снова (так как новый мьютекс буфера никогда не блокировался новым что делает мою текущую реализацию мьютекса бесполезной? Как мне это преодолеть?

1 Ответ

1 голос
/ 02 мая 2020

Проблема в том, что вы заканчиваете l oop, который содержит message_buffer, прежде чем гарантировать, что поток завершил работу с этой памятью.

while (1) {
    char message_buffer[SIZE];
    ssize_t message_length = mq_receive(...);
    if (message_len == -1) { /* error handling */ }

    pthread_t pt1;
    int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
    if (ret) { /* error handling */ }

    /****** Can't go beyond here until thread is done with message_buffer. ******/
}

void * handle_message (void * message) {
    char own_buffer[SIZE];
    strncpy(own_buffer, (char *) message, SIZE);

    /******* Only now can the caller loop back. ******/

    puts(own_buffer);
    return NULL;
}

Вы можете использовать семафор или аналогичный.

static pthread_mutex_t mutex  = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t  cond   = PTHREAD_COND_INITIALIZER;
static int             copied = 0;

while (1) {
    char message_buffer[SIZE];
    ssize_t message_length = mq_receive(...);
    if (message_len == -1) { /* error handling */ }

    pthread_t pt1;
    int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
    if (ret) { /* error handling */ }

    // Wait until threads is done with message_buffer.
    pthread_mutex_lock(&mutex);
    while (!copied) pthread_cond_wait(&cond, &mutex);
    copied = 0;
    pthread_mutex_unlock(&mutex);
}

void * handle_message (void * message) {
    char own_buffer[SIZE];
    strncpy(own_buffer, (char *) message, SIZE);

    // Done with caller's buffer.
    // Signal caller to continue.
    pthread_mutex_lock(&mutex);
    copied = 1;
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);

    puts(own_buffer);
    return NULL;
}

(Добавленные чанки эффективно выполняют операции семафора. См. Последний фрагмент этого ответа для более общей реализации c.)

Но есть более простое решение: Сделайте копию до создания темы.

while (1) {
    char message_buffer[SIZE];
    ssize_t message_length = mq_receive(...);
    if (message_len == -1) { /* error handling */ }

    pthread_t pt1;
    int ret = pthread_create(&pt1, NULL, handle_message, strdup(message_buffer));
    if (ret) { /* error handling */ }
}

void * handle_message (void * message) {
    char * own_buffer = message;
    puts(own_buffer);
    free(own_buffer);
    return NULL;
}
...