В pthread, как надежно передать сигнал другому потоку? - PullRequest
12 голосов
/ 04 августа 2010

Я пытаюсь написать простую программу пула потоков в pthread. Однако, похоже, что pthread_cond_signal не блокирует, что создает проблему. Например, допустим, у меня есть программа «производитель-потребитель»:

pthread_cond_t my_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t my_cond_m = PTHREAD_MUTEX_INITIALIZER;

void * liberator(void * arg)
{
    // XXX make sure he is ready to be freed
    sleep(1);

    pthread_mutex_lock(&my_cond_m);
    pthread_cond_signal(&my_cond);
    pthread_mutex_unlock(&my_cond_m);

    return NULL;
}

int main()
{
    pthread_t t1;
    pthread_create(&t1, NULL, liberator, NULL);

    // XXX Don't take too long to get ready. Otherwise I'll miss 
    // the wake up call forever
    //sleep(3);

    pthread_mutex_lock(&my_cond_m);
    pthread_cond_wait(&my_cond, &my_cond_m);
    pthread_mutex_unlock(&my_cond_m);

    pthread_join(t1, NULL);

    return 0;
}

Как описано в двух отметках XXX, если я уберу вызовы sleep, то main() может зависнуть, поскольку пропустил звонок для пробуждения из liberator(). Конечно, sleep не очень надежный способ гарантировать это.

В реальной жизни это может быть рабочий поток, сообщающий потоку менеджера, что он готов к работе, или поток менеджера, сообщающий о доступности новой работы.

Как бы вы сделали это надежно в pthread?


Разработка

@ Ответ Бореалида вроде работает, но его объяснение проблемы могло бы быть лучше. Я предлагаю всем, кто смотрит на этот вопрос, прочитать обсуждение в комментариях, чтобы понять, что происходит.

В частности, я бы сам изменил его ответ и пример кода, чтобы сделать это более понятным. (Поскольку оригинальный ответ Бореалида, хотя он был скомпилирован и работал, меня сильно смутил)

// In main
pthread_mutex_lock(&my_cond_m);

// If the flag is not set, it means liberator has not 
// been run yet. I'll wait for him through pthread's signaling 
// mechanism

// If it _is_ set, it means liberator has been run. I'll simply 
// skip waiting since I've already synchronized. I don't need to 
// use pthread's signaling mechanism
if(!flag) pthread_cond_wait(&my_cond, &my_cond_m);

pthread_mutex_unlock(&my_cond_m);

// In liberator thread
pthread_mutex_lock(&my_cond_m);

// Signal anyone who's sleeping. If no one is sleeping yet, 
// they should check this flag which indicates I have already 
// sent the signal. This is needed because pthread's signals 
// is not like a message queue -- a sent signal is lost if 
// nobody's waiting for a condition when it's sent.
// You can think of this flag as a "persistent" signal
flag = 1;
pthread_cond_signal(&my_cond);
pthread_mutex_unlock(&my_cond_m);

Ответы [ 2 ]

7 голосов
/ 04 августа 2010

Используйте переменную синхронизации.

В main:

pthread_mutex_lock(&my_cond_m);
while (!flag) {
    pthread_cond_wait(&my_cond, &my_cond_m);
}
pthread_mutex_unlock(&my_cond_m);

В потоке:

pthread_mutex_lock(&my_cond_m);
flag = 1;
pthread_cond_broadcast(&my_cond);
pthread_mutex_unlock(&my_cond_m);

Для проблемы производитель-потребитель этобыть потребителем, спящим, когда буфер пуст, и производителем, спящим, когда он заполнен.Не забудьте получить блокировку перед тем, как получит доступ к глобальной переменной.

4 голосов
/ 04 августа 2010

Я нашел решение здесь . Для меня немного сложнее понять проблему:

  1. Производители и потребители должны иметь возможность общаться в обоих направлениях. В любом случае недостаточно.
  2. Эта двусторонняя связь может быть упакована в одно условие.

Чтобы проиллюстрировать, упомянутое выше сообщение в блоге продемонстрировало, что это действительно значимое и желательное поведение:

pthread_mutex_lock(&cond_mutex);
pthread_cond_broadcast(&cond):
pthread_cond_wait(&cond, &cond_mutex);
pthread_mutex_unlock(&cond_mutex);

Идея состоит в том, что если и производители, и потребители используют эту логику, то будет безопасно, чтобы кто-то из них спал первым, поскольку каждый из них сможет разбудить другую роль. Другими словами, в типичном сценарии «производитель-потребитель» - если потребителю нужно спать, это потому, что производителю нужно проснуться, и наоборот. Упаковка этой логики в одно условие pthread имеет смысл.

Конечно, приведенный выше код имеет непреднамеренное поведение, когда рабочий поток также пробуждает другой спящий рабочий поток, когда он на самом деле просто хочет разбудить производителя. Это можно решить с помощью простой проверки переменных, как подсказывает @Borealid:

while(!work_available) pthread_cond_wait(&cond, &cond_mutex);

При рабочей трансляции все рабочие потоки будут пробуждены, но один за другим (из-за неявной блокировки мьютекса в pthread_cond_wait). Поскольку один из рабочих потоков будет потреблять работу (установив work_available обратно на false), когда другие рабочие потоки проснутся и фактически приступят к работе, работа будет недоступна, поэтому рабочий снова будет спать.

Вот некоторый прокомментированный код, который я тестировал, для всех, кто интересуется:

// gcc -Wall -pthread threads.c -lpthread

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <assert.h>

pthread_cond_t my_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t my_cond_m = PTHREAD_MUTEX_INITIALIZER;

int * next_work = NULL;
int all_work_done = 0;

void * worker(void * arg)
{
    int * my_work = NULL;

    while(!all_work_done)
    {
        pthread_mutex_lock(&my_cond_m);

        if(next_work == NULL)
        {
            // Signal producer to give work
            pthread_cond_broadcast(&my_cond);

            // Wait for work to arrive
            // It is wrapped in a while loop because the condition 
            // might be triggered by another worker thread intended 
            // to wake up the producer
            while(!next_work && !all_work_done)
                pthread_cond_wait(&my_cond, &my_cond_m);
        }

        // Work has arrived, cache it locally so producer can 
        // put in next work ASAP
        my_work = next_work;
        next_work = NULL;
        pthread_mutex_unlock(&my_cond_m);

        if(my_work)
        {
            printf("Worker %d consuming work: %d\n", (int)(pthread_self() % 100), *my_work);
            free(my_work);
        }
    }

    return NULL;
}

int * create_work()
{
    int * ret = (int *)malloc(sizeof(int));
    assert(ret);
    *ret = rand() % 100;
    return ret;
}

void * producer(void * arg)
{
    int i;

    for(i = 0; i < 10; i++)
    {
        pthread_mutex_lock(&my_cond_m);
        while(next_work != NULL)
        {
            // There's still work, signal a worker to pick it up
            pthread_cond_broadcast(&my_cond);

            // Wait for work to be picked up
            pthread_cond_wait(&my_cond, &my_cond_m);
        }

        // No work is available now, let's put work on the queue
        next_work = create_work();
        printf("Producer: Created work %d\n", *next_work);

        pthread_mutex_unlock(&my_cond_m);
    }

    // Some workers might still be waiting, release them
    pthread_cond_broadcast(&my_cond);

    all_work_done = 1;
    return NULL;
}

int main()
{
    pthread_t t1, t2, t3, t4;

    pthread_create(&t1, NULL, worker, NULL);
    pthread_create(&t2, NULL, worker, NULL);
    pthread_create(&t3, NULL, worker, NULL);
    pthread_create(&t4, NULL, worker, NULL);

    producer(NULL);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
    return 0;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...