Я собираюсь предпринять еще одну попытку в этом примере реализации pthread_barrier_wait()
, в которой используются функции мьютекса и условных переменных, которые могут быть предоставлены реализацией pthreads. Обратите внимание, что этот пример не пытается учитывать соображения производительности (в частности, когда ожидающие потоки разблокированы, они все повторно сериализуются при выходе из ожидания). Я думаю, что использование чего-то вроде объектов Linux Futex могло бы помочь с проблемами производительности, но Futexes все еще в значительной степени вне моего опыта.
Кроме того, я сомневаюсь, что этот пример правильно обрабатывает сигналы или ошибки (если вообще имеет место в случае сигналов). Но я думаю, что правильная поддержка этих вещей может быть добавлена в качестве упражнения для читателя.
Мой главный страх состоит в том, что в примере может быть состояние гонки или тупик (обработка мьютекса более сложна, чем мне нравится). Также обратите внимание, что это пример, который даже не был скомпилирован. Считайте это псевдокодом. Также имейте в виду, что мой опыт в основном связан с Windows - я рассматриваю это больше как возможность для обучения, чем что-либо еще. Таким образом, качество псевдокода может быть довольно низким.
Однако, за исключением оговорок, я думаю, что это может дать представление о том, как можно решить проблему, заданную в вопросе (т. Е. Как функция pthread_barrier_wait()
позволяет объекту pthread_barrier_t
, который она использует, быть уничтоженным любым из освобожденных нитей без опасности использования барьерного объекта одной или несколькими нитями на выходе).
Вот так:
/*
* Since this is a part of the implementation of the pthread API, it uses
* reserved names that start with "__" for internal structures and functions
*
* Functions such as __mutex_lock() and __cond_wait() perform the same function
* as the corresponding pthread API.
*/
// struct __barrier_wait data is intended to hold all the data
// that `pthread_barrier_wait()` will need after releasing
// waiting threads. This will allow the function to avoid
// touching the passed in pthread_barrier_t object after
// the wait is satisfied (since any of the released threads
// can destroy it)
struct __barrier_waitdata {
struct __mutex cond_mutex;
struct __cond cond;
unsigned waiter_count;
int wait_complete;
};
struct __barrier {
unsigned count;
struct __mutex waitdata_mutex;
struct __barrier_waitdata* pwaitdata;
};
typedef struct __barrier pthread_barrier_t;
int __barrier_waitdata_init( struct __barrier_waitdata* pwaitdata)
{
waitdata.waiter_count = 0;
waitdata.wait_complete = 0;
rc = __mutex_init( &waitdata.cond_mutex, NULL);
if (!rc) {
return rc;
}
rc = __cond_init( &waitdata.cond, NULL);
if (!rc) {
__mutex_destroy( &pwaitdata->waitdata_mutex);
return rc;
}
return 0;
}
int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned int count)
{
int rc;
result = __mutex_init( &barrier->waitdata_mutex, NULL);
if (!rc) return result;
barrier->pwaitdata = NULL;
barrier->count = count;
//TODO: deal with attr
}
int pthread_barrier_wait(pthread_barrier_t *barrier)
{
int rc;
struct __barrier_waitdata* pwaitdata;
unsigned target_count;
// potential waitdata block (only one thread's will actually be used)
struct __barrier_waitdata waitdata;
// nothing to do if we only need to wait for one thread...
if (barrier->count == 1) return PTHREAD_BARRIER_SERIAL_THREAD;
rc = __mutex_lock( &barrier->waitdata_mutex);
if (!rc) return rc;
if (!barrier->pwaitdata) {
// no other thread has claimed the waitdata block yet -
// we'll use this thread's
rc = __barrier_waitdata_init( &waitdata);
if (!rc) {
__mutex_unlock( &barrier->waitdata_mutex);
return rc;
}
barrier->pwaitdata = &waitdata;
}
pwaitdata = barrier->pwaitdata;
target_count = barrier->count;
// all data necessary for handling the return from a wait is pointed to
// by `pwaitdata`, and `pwaitdata` points to a block of data on the stack of
// one of the waiting threads. We have to make sure that the thread that owns
// that block waits until all others have finished with the information
// pointed to by `pwaitdata` before it returns. However, after the 'big' wait
// is completed, the `pthread_barrier_t` object that's passed into this
// function isn't used. The last operation done to `*barrier` is to set
// `barrier->pwaitdata = NULL` to satisfy the requirement that this function
// leaves `*barrier` in a state as if `pthread_barrier_init()` had been called - and
// that operation is done by the thread that signals the wait condition
// completion before the completion is signaled.
// note: we're still holding `barrier->waitdata_mutex`;
rc = __mutex_lock( &pwaitdata->cond_mutex);
pwaitdata->waiter_count += 1;
if (pwaitdata->waiter_count < target_count) {
// need to wait for other threads
__mutex_unlock( &barrier->waitdata_mutex);
do {
// TODO: handle the return code from `__cond_wait()` to break out of this
// if a signal makes that necessary
__cond_wait( &pwaitdata->cond, &pwaitdata->cond_mutex);
} while (!pwaitdata->wait_complete);
}
else {
// this thread satisfies the wait - unblock all the other waiters
pwaitdata->wait_complete = 1;
// 'release' our use of the passed in pthread_barrier_t object
barrier->pwaitdata = NULL;
// unlock the barrier's waitdata_mutex - the barrier is
// ready for use by another set of threads
__mutex_unlock( barrier->waitdata_mutex);
// finally, unblock the waiting threads
__cond_broadcast( &pwaitdata->cond);
}
// at this point, barrier->waitdata_mutex is unlocked, the
// barrier->pwaitdata pointer has been cleared, and no further
// use of `*barrier` is permitted...
// however, each thread still has a valid `pwaitdata` pointer - the
// thread that owns that block needs to wait until all others have
// dropped the pwaitdata->waiter_count
// also, at this point the `pwaitdata->cond_mutex` is locked, so
// we're in a critical section
rc = 0;
pwaitdata->waiter_count--;
if (pwaitdata == &waitdata) {
// this thread owns the waitdata block - it needs to hang around until
// all other threads are done
// as a convenience, this thread will be the one that returns
// PTHREAD_BARRIER_SERIAL_THREAD
rc = PTHREAD_BARRIER_SERIAL_THREAD;
while (pwaitdata->waiter_count!= 0) {
__cond_wait( &pwaitdata->cond, &pwaitdata->cond_mutex);
};
__mutex_unlock( &pwaitdata->cond_mutex);
__cond_destroy( &pwaitdata->cond);
__mutex_destroy( &pwaitdata_cond_mutex);
}
else if (pwaitdata->waiter_count == 0) {
__cond_signal( &pwaitdata->cond);
__mutex_unlock( &pwaitdata->cond_mutex);
}
return rc;
}
17 июля 2011 г.: обновление в ответ на комментарий / вопрос об общих барьерах процесса
Я полностью забыл о ситуации с барьерами, которые разделяются между процессами. И, как вы упомянули, идея, которую я изложил, в этом случае потерпит ужас. У меня нет особого опыта использования разделяемой памяти POSIX, поэтому любые предложения, которые я высказываю , должны сдерживаться скептицизмом .
Подводя итог (для моего блага, если не для кого):
Когда какой-либо из потоков получает управление после возврата pthread_barrier_wait()
, барьерный объект должен находиться в состоянии 'init' (однако, последний pthread_barrier_init()
для этого объекта установил его). API также подразумевает, что при возврате любого из потоков может произойти одно или несколько из следующих событий:
- очередной вызов
pthread_barrier_wait()
для начала нового раунда синхронизации потоков
pthread_barrier_destroy()
на барьерном объекте
- память, выделенная для объекта-барьера, может быть освобождена или не разделена, если она находится в области общей памяти.
Это означает, что прежде чем вызов pthread_barrier_wait()
позволит любому потоку вернуться, он в значительной степени должен убедиться, что все ожидающие потоки больше не используют барьерный объект в контексте этого вызова. Мой первый ответ обратился к этому, создав «локальный» набор объектов синхронизации (мьютекс и связанную переменную условия) вне объекта барьера, который блокировал бы все потоки. Эти локальные объекты синхронизации были размещены в стеке потока, который сначала вызвал pthread_barrier_wait()
.
Я думаю, что нечто подобное должно быть сделано для барьеров, которые разделяются процессом. Однако в этом случае простое размещение этих синхронизирующих объектов в стеке потока не подходит (так как другие процессы не имеют доступа). Для общего барьера процесса эти объекты должны быть размещены в общей памяти процесса. Я думаю, что техника, которую я перечислил выше, может применяться аналогично:
-
waitdata_mutex
, который управляет «распределением» локальных переменных синхронизации (блок waitdata), будет уже в общей памяти процесса, поскольку он находится в структуре барьера. Конечно, когда барьер установлен на THEAD_PROCESS_SHARED
, этот атрибут также необходимо применить к waitdata_mutex
- когда
__barrier_waitdata_init()
вызывается для инициализации локальной переменной mutex & condition, он должен будет размещать эти объекты в разделяемой памяти вместо простого использования основанной на стеке переменной waitdata
.
- когда поток 'cleanup' уничтожает мьютекс и переменную условия в блоке
waitdata
, ему также необходимо очистить выделение памяти для процесса.
- в случае использования разделяемой памяти должен существовать некоторый механизм, обеспечивающий, чтобы объект разделяемой памяти открывался хотя бы один раз в каждом процессе и закрывался правильное число раз в каждом процессе (но не закрывался полностью до каждый поток в процессе закончен, используя это). Я не продумал, как именно это будет сделано ...
Я думаю, что эти изменения позволят схеме работать с общими для процесса барьерами. последний пункт выше - ключевой элемент, который нужно выяснить. Другой способ состоит в том, как создать имя для объекта общей памяти, который будет содержать «локальный» процесс-общий waitdata
. Для этого имени вам понадобятся определенные атрибуты:
- вы хотите, чтобы хранилище для имени находилось в структуре
struct pthread_barrier_t
, чтобы все процессы имели к нему доступ; это означает известный предел длины имени
- вы бы хотели, чтобы имя было уникальным для каждого «экземпляра» набора вызовов на
pthread_barrier_wait()
, потому что может быть возможным запустить второй раунд ожидания до того, как все потоки пройдут весь путь из ожидание первого раунда (поэтому блок общей памяти процесса, настроенный для waitdata
, возможно, еще не освобожден). Так что имя, вероятно, должно основываться на таких вещах, как идентификатор процесса, идентификатор потока, адрес объекта барьера и атомный счетчик.
- Я не знаю, есть ли последствия для безопасности, чтобы имя было «угадываемым». если это так, нужно добавить некоторую рандомизацию - понятия не имею, сколько. Возможно, вам также понадобится хешировать данные, упомянутые выше, вместе со случайными битами. Как я уже сказал, я действительно понятия не имею, важно это или нет.