Как можно разрушить барьеры, как только вернется pthread_barrier_wait? - PullRequest
11 голосов
/ 04 мая 2011

Этот вопрос основан на:

Когда безопасно разрушить барьер pthread?

и недавний отчет об ошибке glibc:

http://sourceware.org/bugzilla/show_bug.cgi?id=12674

Я не уверен насчет проблемы семафоров, о которой сообщалось в glibc, но, предположительно, она должна быть действительной, чтобы уничтожить барьер, как только pthread_barrier_wait вернется, согласно приведенному выше связанному вопросу.(Обычно поток, получивший PTHREAD_BARRIER_SERIAL_THREAD, или «специальный» поток, который уже считал себя «ответственным» за барьерный объект, был бы тем, кто его разрушает.) Основной вариант использования, о котором я могу думать, это когда барьериспользуется для синхронизации использования данных новым потоком в стеке потока создания, предотвращая возврат потока создания до тех пор, пока новый поток не будет использовать данные;другие барьеры, вероятно, имеют время жизни, равное времени жизни всей программы или управляемое каким-либо другим объектом синхронизации.

В любом случае, как реализация может обеспечить разрушение барьера (ивозможно, даже удаление карты памяти, в которой оно находится), безопасно, как только pthread_barrier_wait вернется в какой-либо поток?Похоже, что другим потокам, которые еще не вернулись, нужно будет проверить хотя бы некоторую часть объекта барьера, чтобы завершить свою работу и вернуться, подобно тому, как в приведенном выше отчете об ошибке glibc sem_post должен проверить количество официантов.после настройки значения семафора.

Ответы [ 2 ]

7 голосов
/ 05 мая 2011

Я собираюсь предпринять еще одну попытку в этом примере реализации pthread_barrier_wait(), в которой используются функции мьютекса и условных переменных, которые могут быть предоставлены реализацией pthreads. Обратите внимание, что этот пример не пытается учитывать соображения производительности (в частности, когда ожидающие потоки разблокированы, они все повторно сериализуются при выходе из ожидания). Я думаю, что использование чего-то вроде объектов Linux Futex могло бы помочь с проблемами производительности, но Futexes все еще в значительной степени вне моего опыта.

Кроме того, я сомневаюсь, что этот пример правильно обрабатывает сигналы или ошибки (если вообще имеет место в случае сигналов). Но я думаю, что правильная поддержка этих вещей может быть добавлена ​​в качестве упражнения для читателя.

Мой главный страх состоит в том, что в примере может быть состояние гонки или тупик (обработка мьютекса более сложна, чем мне нравится). Также обратите внимание, что это пример, который даже не был скомпилирован. Считайте это псевдокодом. Также имейте в виду, что мой опыт в основном связан с Windows - я рассматриваю это больше как возможность для обучения, чем что-либо еще. Таким образом, качество псевдокода может быть довольно низким.

Однако, за исключением оговорок, я думаю, что это может дать представление о том, как можно решить проблему, заданную в вопросе (т. Е. Как функция pthread_barrier_wait() позволяет объекту pthread_barrier_t, который она использует, быть уничтоженным любым из освобожденных нитей без опасности использования барьерного объекта одной или несколькими нитями на выходе).

Вот так:

/* 
 *  Since this is a part of the implementation of the pthread API, it uses
 *  reserved names that start with "__" for internal structures and functions
 *
 *  Functions such as __mutex_lock() and __cond_wait() perform the same function
 *  as the corresponding pthread API.
 */

// struct __barrier_wait data is intended to hold all the data
//  that `pthread_barrier_wait()` will need after releasing
//  waiting threads.  This will allow the function to avoid
//  touching the passed in pthread_barrier_t object after 
//  the wait is satisfied (since any of the released threads
//   can destroy it)

struct __barrier_waitdata {
    struct __mutex cond_mutex;
    struct __cond cond;

    unsigned waiter_count;
    int wait_complete;
};

struct __barrier {
    unsigned count;

    struct __mutex waitdata_mutex;
    struct __barrier_waitdata* pwaitdata;
};

typedef struct __barrier pthread_barrier_t;



int __barrier_waitdata_init( struct __barrier_waitdata* pwaitdata)
{
    waitdata.waiter_count = 0;
    waitdata.wait_complete = 0;

    rc = __mutex_init( &waitdata.cond_mutex, NULL);
    if (!rc) {
        return rc;
    }

    rc = __cond_init( &waitdata.cond, NULL);
    if (!rc) {
        __mutex_destroy( &pwaitdata->waitdata_mutex);
        return rc;
    }

    return 0;
}




int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned int count)
{
    int rc;

    result = __mutex_init( &barrier->waitdata_mutex, NULL);
    if (!rc) return result;

    barrier->pwaitdata = NULL;
    barrier->count = count;

    //TODO: deal with attr
}



int pthread_barrier_wait(pthread_barrier_t *barrier)
{
    int rc;
    struct __barrier_waitdata* pwaitdata;
    unsigned target_count;

    // potential waitdata block (only one thread's will actually be used)
    struct __barrier_waitdata waitdata; 

    // nothing to do if we only need to wait for one thread...
    if (barrier->count == 1) return PTHREAD_BARRIER_SERIAL_THREAD;

    rc = __mutex_lock( &barrier->waitdata_mutex);
    if (!rc) return rc;

    if (!barrier->pwaitdata) {
        // no other thread has claimed the waitdata block yet - 
        //  we'll use this thread's

        rc = __barrier_waitdata_init( &waitdata);
        if (!rc) {
            __mutex_unlock( &barrier->waitdata_mutex);
            return rc;
        }

        barrier->pwaitdata = &waitdata;
    }

    pwaitdata = barrier->pwaitdata;
    target_count = barrier->count;

    //  all data necessary for handling the return from a wait is pointed to
    //  by `pwaitdata`, and `pwaitdata` points to a block of data on the stack of
    //  one of the waiting threads.  We have to make sure that the thread that owns
    //  that block waits until all others have finished with the information
    //  pointed to by `pwaitdata` before it returns.  However, after the 'big' wait
    //  is completed, the `pthread_barrier_t` object that's passed into this 
    //  function isn't used. The last operation done to `*barrier` is to set 
    //  `barrier->pwaitdata = NULL` to satisfy the requirement that this function
    //  leaves `*barrier` in a state as if `pthread_barrier_init()` had been called - and
    //  that operation is done by the thread that signals the wait condition 
    //  completion before the completion is signaled.

    // note: we're still holding  `barrier->waitdata_mutex`;

    rc = __mutex_lock( &pwaitdata->cond_mutex);
    pwaitdata->waiter_count += 1;

    if (pwaitdata->waiter_count < target_count) {
        // need to wait for other threads

        __mutex_unlock( &barrier->waitdata_mutex);
        do {
            // TODO:  handle the return code from `__cond_wait()` to break out of this
            //          if a signal makes that necessary
            __cond_wait( &pwaitdata->cond,  &pwaitdata->cond_mutex);
        } while (!pwaitdata->wait_complete);
    }
    else {
        // this thread satisfies the wait - unblock all the other waiters
        pwaitdata->wait_complete = 1;

        // 'release' our use of the passed in pthread_barrier_t object
        barrier->pwaitdata = NULL;

        // unlock the barrier's waitdata_mutex - the barrier is  
        //  ready for use by another set of threads
        __mutex_unlock( barrier->waitdata_mutex);

        // finally, unblock the waiting threads
        __cond_broadcast( &pwaitdata->cond);
    }

    // at this point, barrier->waitdata_mutex is unlocked, the 
    //  barrier->pwaitdata pointer has been cleared, and no further 
    //  use of `*barrier` is permitted...

    // however, each thread still has a valid `pwaitdata` pointer - the 
    // thread that owns that block needs to wait until all others have 
    // dropped the pwaitdata->waiter_count

    // also, at this point the `pwaitdata->cond_mutex` is locked, so
    //  we're in a critical section

    rc = 0;
    pwaitdata->waiter_count--;

    if (pwaitdata == &waitdata) {
        // this thread owns the waitdata block - it needs to hang around until 
        //  all other threads are done

        // as a convenience, this thread will be the one that returns 
        //  PTHREAD_BARRIER_SERIAL_THREAD
        rc = PTHREAD_BARRIER_SERIAL_THREAD;

        while (pwaitdata->waiter_count!= 0) {
            __cond_wait( &pwaitdata->cond, &pwaitdata->cond_mutex);
        };

        __mutex_unlock( &pwaitdata->cond_mutex);
        __cond_destroy( &pwaitdata->cond);
        __mutex_destroy( &pwaitdata_cond_mutex);
    }
    else if (pwaitdata->waiter_count == 0) {
        __cond_signal( &pwaitdata->cond);
        __mutex_unlock( &pwaitdata->cond_mutex);
    }

    return rc;
}

17 июля 2011 г.: обновление в ответ на комментарий / вопрос об общих барьерах процесса

Я полностью забыл о ситуации с барьерами, которые разделяются между процессами. И, как вы упомянули, идея, которую я изложил, в этом случае потерпит ужас. У меня нет особого опыта использования разделяемой памяти POSIX, поэтому любые предложения, которые я высказываю , должны сдерживаться скептицизмом .

Подводя итог (для моего блага, если не для кого):

Когда какой-либо из потоков получает управление после возврата pthread_barrier_wait(), барьерный объект должен находиться в состоянии 'init' (однако, последний pthread_barrier_init() для этого объекта установил его). API также подразумевает, что при возврате любого из потоков может произойти одно или несколько из следующих событий:

  • очередной вызов pthread_barrier_wait() для начала нового раунда синхронизации потоков
  • pthread_barrier_destroy() на барьерном объекте
  • память, выделенная для объекта-барьера, может быть освобождена или не разделена, если она находится в области общей памяти.

Это означает, что прежде чем вызов pthread_barrier_wait() позволит любому потоку вернуться, он в значительной степени должен убедиться, что все ожидающие потоки больше не используют барьерный объект в контексте этого вызова. Мой первый ответ обратился к этому, создав «локальный» набор объектов синхронизации (мьютекс и связанную переменную условия) вне объекта барьера, который блокировал бы все потоки. Эти локальные объекты синхронизации были размещены в стеке потока, который сначала вызвал pthread_barrier_wait().

Я думаю, что нечто подобное должно быть сделано для барьеров, которые разделяются процессом. Однако в этом случае простое размещение этих синхронизирующих объектов в стеке потока не подходит (так как другие процессы не имеют доступа). Для общего барьера процесса эти объекты должны быть размещены в общей памяти процесса. Я думаю, что техника, которую я перечислил выше, может применяться аналогично:

  • waitdata_mutex, который управляет «распределением» локальных переменных синхронизации (блок waitdata), будет уже в общей памяти процесса, поскольку он находится в структуре барьера. Конечно, когда барьер установлен на THEAD_PROCESS_SHARED, этот атрибут также необходимо применить к waitdata_mutex
  • когда __barrier_waitdata_init() вызывается для инициализации локальной переменной mutex & condition, он должен будет размещать эти объекты в разделяемой памяти вместо простого использования основанной на стеке переменной waitdata.
  • когда поток 'cleanup' уничтожает мьютекс и переменную условия в блоке waitdata, ему также необходимо очистить выделение памяти для процесса.
  • в случае использования разделяемой памяти должен существовать некоторый механизм, обеспечивающий, чтобы объект разделяемой памяти открывался хотя бы один раз в каждом процессе и закрывался правильное число раз в каждом процессе (но не закрывался полностью до каждый поток в процессе закончен, используя это). Я не продумал, как именно это будет сделано ...

Я думаю, что эти изменения позволят схеме работать с общими для процесса барьерами. последний пункт выше - ключевой элемент, который нужно выяснить. Другой способ состоит в том, как создать имя для объекта общей памяти, который будет содержать «локальный» процесс-общий waitdata. Для этого имени вам понадобятся определенные атрибуты:

  • вы хотите, чтобы хранилище для имени находилось в структуре struct pthread_barrier_t, чтобы все процессы имели к нему доступ; это означает известный предел длины имени
  • вы бы хотели, чтобы имя было уникальным для каждого «экземпляра» набора вызовов на pthread_barrier_wait(), потому что может быть возможным запустить второй раунд ожидания до того, как все потоки пройдут весь путь из ожидание первого раунда (поэтому блок общей памяти процесса, настроенный для waitdata, возможно, еще не освобожден). Так что имя, вероятно, должно основываться на таких вещах, как идентификатор процесса, идентификатор потока, адрес объекта барьера и атомный счетчик.
  • Я не знаю, есть ли последствия для безопасности, чтобы имя было «угадываемым». если это так, нужно добавить некоторую рандомизацию - понятия не имею, сколько. Возможно, вам также понадобится хешировать данные, упомянутые выше, вместе со случайными битами. Как я уже сказал, я действительно понятия не имею, важно это или нет.
1 голос
/ 04 мая 2011

Насколько я понимаю, нет необходимости в pthread_barrier_destroy немедленной операции. Вы можете подождать, пока все потоки, которые все еще находятся в фазе пробуждения, будут разбужены.

Например, у вас может быть атомный счетчик awakening, который изначально настроен на количество пробуждаемых потоков. Затем оно будет уменьшено как последнее действие, прежде чем pthread_barrier_wait вернется. pthread_barrier_destroy тогда может просто вращаться, пока счетчик не упадет до 0.

...