Во-первых, сводка:
pthread_mutex_lock(&mutex)
:
Если mutex
свободен, то этот поток немедленно его захватывает.
Если захватывается mutex
, то этот поток ожидает, пока mutex
не станет свободным, а затем захватывает его.
pthread_mutex_trylock(&mutex)
:
Еслиmutex
свободен, тогда этот поток захватывает его.
Если захватить mutex
, то вызов немедленно возвращается с EBUSY
.
pthread_mutex_unlock(&mutex)
:
Выпуски mutex
.
pthread_cond_signal(&cond)
:
Пробуждение одного потока в ожидании переменной условия cond
.
pthread_cond_broadcast(&cond)
:
Разбудить все потоки, ожидающие условной переменной cond
.
pthread_cond_wait(&cond, &mutex)
:
Это необходимо вызвать с mutex
grabbed.
Вызывающая нить временно освободит mutex
и будет ждать cond
.
Когда cond
транслируется или сигнализируется, и этот поток оказывается пробужденным, тогда вызывающийпоток сначала перехватит mutex
, а затем вернется из вызова.
Важно отметить, что всегда вызывающий поток либо захватывал mutex
, или ожидает cond
.Между ними нет интервала.
Давайте посмотрим на практический пример кода.Мы создадим его в соответствии с кодами OP.
Сначала мы будем использовать структуру для хранения параметров каждой рабочей функции.Поскольку мы хотим, чтобы мьютекс и переменная условия были разделены между потоками, мы будем использовать указатели.
#define _POSIX_C_SOURCE 200809L
#include <stdlib.h>
#include <pthread.h>
#include <limits.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
/* Worker function work. */
struct work {
pthread_t thread_id;
pthread_mutex_t *lock; /* Pointer to the mutex to use */
pthread_cond_t *wait; /* Pointer to the condition variable to use */
volatile int *done; /* Pointer to the flag to check */
FILE *out; /* Stream to output to */
long id; /* Identity of this thread */
unsigned long count; /* Number of times this thread iterated. */
};
Рабочая функция потока получает указатель на указанную выше структуру.Каждый поток повторяет цикл один раз, затем ожидает переменную условия.При пробуждении, если флаг done все еще равен нулю, поток повторяет цикл.В противном случае поток завершается.
/* Example worker function. */
void *worker(void *workptr)
{
struct work *const work = workptr;
pthread_mutex_lock(work->lock);
/* Loop as long as *done == 0: */
while (!*(work->done)) {
/* *(work->lock) is ours at this point. */
/* This is a new iteration. */
work->count++;
/* Do the work. */
fprintf(work->out, "Thread %ld iteration %lu\n", work->id, work->count);
fflush(work->out);
/* Wait for wakeup. */
pthread_cond_wait(work->wait, work->lock);
}
/* *(work->lock) is still ours, but we've been told that all work is done already. */
/* Release the mutex and be done. */
pthread_mutex_unlock(work->lock);
return NULL;
}
Для выполнения вышеприведенного нам также понадобится main ():
#ifndef THREADS
#define THREADS 4
#endif
int main(void)
{
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t wait = PTHREAD_COND_INITIALIZER;
volatile int done = 0;
struct work w[THREADS];
char *line = NULL, *p;
size_t size = 0;
ssize_t len = 0;
unsigned long total;
pthread_attr_t attrs;
int i, err;
/* The worker functions require very little stack, but the default stack
size is huge. Limit that, to reduce the (virtual) memory use. */
pthread_attr_init(&attrs);
pthread_attr_setstacksize(&attrs, 2 * PTHREAD_STACK_MIN);
/* Grab the mutex so the threads will have to wait to grab it. */
pthread_mutex_lock(&lock);
/* Create THREADS worker threads. */
for (i = 0; i < THREADS; i++) {
/* All threads use the same mutex, condition variable, and done flag. */
w[i].lock = &lock;
w[i].wait = &wait;
w[i].done = &done;
/* All threads output to standard output. */
w[i].out = stdout;
/* The rest of the fields are thread-specific. */
w[i].id = i + 1;
w[i].count = 0;
err = pthread_create(&(w[i].thread_id), &attrs, worker, (void *)&(w[i]));
if (err) {
fprintf(stderr, "Cannot create thread %d of %d: %s.\n", i+1, THREADS, strerror(errno));
exit(EXIT_FAILURE); /* Exits the entire process, killing any other threads as well. */
}
}
fprintf(stderr, "The first character on each line controls the type of event:\n");
fprintf(stderr, " e, q exit\n");
fprintf(stderr, " s signal\n");
fprintf(stderr, " b broadcast\n");
fflush(stderr);
/* Let each thread grab the mutex now. */
pthread_mutex_unlock(&lock);
while (1) {
len = getline(&line, &size, stdin);
if (len < 1)
break;
/* Find the first character on the line, ignoring leading whitespace. */
p = line;
while ((p < line + len) && (*p == '\0' || *p == '\t' || *p == '\n' ||
*p == '\v' || *p == '\f' || *p == '\r' || *p == ' '))
p++;
/* Do the operation mentioned */
if (*p == 'e' || *p == 'E' || *p == 'q' || *p == 'Q')
break;
else
if (*p == 's' || *p == 'S')
pthread_cond_signal(&wait);
else
if (*p == 'b' || *p == 'B')
pthread_cond_broadcast(&wait);
}
/* It is time for the worker threads to be done. */
pthread_mutex_lock(&lock);
done = 1;
pthread_mutex_unlock(&lock);
/* To ensure all threads see the state of that flag,
we wake up all threads by broadcasting on the condition variable. */
pthread_cond_broadcast(&wait);
/* Reap all threds. */
for (i = 0; i < THREADS; i++)
pthread_join(w[i].thread_id, NULL);
/* Output the thread statistics. */
total = 0;
for (i = 0; i < THREADS; i++) {
total += w[i].count;
fprintf(stderr, "Thread %ld: %lu events.\n", w[i].id, w[i].count);
}
fprintf(stderr, "Total: %lu events.\n", total);
return EXIT_SUCCESS;
}
Если вы сохраните вышеприведенное как example.c
,вы можете скомпилировать его в example
, используя, например, gcc -Wall -O2 example.c -lpthread -o example
.
. Чтобы получить правильное интуитивное понимание операций, запустите пример в терминале с исходным кодом в окне рядом с ним и посмотритекак выполняется выполнение при вводе данных.
Вы даже можете запускать такие команды, как printf '%s\n' s s s b q | ./example
, чтобы выполнить последовательность событий в быстрой последовательности, или printf 's\ns\ns\nb\nq\n' | ./example
с еще меньшим временем между событиями.
После некоторых экспериментов вы, надеюсь, обнаружите, что не все входные события вызывают соответствующее действие .Это связано с тем, что событие выхода (q
выше) не является синхронным: оно не ожидает завершения всей ожидающей работы, а сообщает потокам о том, что нужно выйти прямо сейчас.Вот почему количество событий может варьироваться даже для одного и того же входа.
(Кроме того, если вы сигнализируете переменную условия и сразу транслируете ее, потоки, как правило, просыпаются только один раз.)
Вы можете уменьшить это, задержав выход, используя, например, (printf '%s\n' s s b s s s ; sleep 1 ; printf 'q\n' ) | ./example
.
Однако есть и лучшие способы.Переменная условия не подходит для счетных событий;это действительно как флаг.Семафор будет работать лучше, но тогда вы должны быть осторожны, чтобы не переполнить семафор;оно может быть только от 0 до SEM_VALUE_MAX
включительно.(Таким образом, вы можете использовать семафор для представления числа отложенных заданий, но, вероятно, не для количества итераций, выполненных каждым / всеми рабочими потока.) Очередь для работы, которую нужно выполнить, в пуле потоковмода, это самый распространенный подход.