Я думаю, что вы правы, а учебник не прав. Ваша проблема с учебником очень похожа на Удаленный вызов процедур, который представляет собой парадигму передачи сообщений, при которой субъект отправляет сообщение другому субъекту и приостанавливается до тех пор, пока первый субъект не получит ответное сообщение. Хотя в случае RP C необязательно, чтобы исходный принимающий актер отвечал; ответ должен быть связан таким образом, чтобы это было не просто случайное поступление сообщения.
Протокол, который вы показали, не делает этого; это просто передача сообщений. Ниже приводится работоспособная реализация. Он создает N пар потоков и пытается перейти из a-> b, затем b-> a, и проверяет, что он получает ожидаемый результат. Это не так. Маленькие программы - отличный способ исследовать эти проблемы. Если вы много делаете, стоит изучить Promela, которая создана для моделирования этих проблем.
#include <assert.h>
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define ISCALE 1000
#define SSCALE 1000
#define MAXT 160
typedef int Sem;
typedef struct Buf Buf;
struct Buf {
int buf;
Sem full;
Sem empty;
};
typedef struct Targ Targ;
struct Targ {
Buf *From;
Buf *To;
int id;
};
static int niter = ISCALE;
static int napt = SSCALE;
static pthread_mutex_t slock;
static pthread_cond_t scond;
static void Wait(Sem *s) {
assert(pthread_mutex_lock(&slock) == 0);
while (*s <= 0) {
assert(pthread_cond_wait(&scond, &slock) == 0);
}
*s -= 1;
assert(pthread_mutex_unlock(&slock) == 0);
}
static void Signal(Sem *s) {
assert(pthread_mutex_lock(&slock) == 0);
*s += 1;
if (*s == 1) {
assert(pthread_cond_broadcast(&scond) == 0);
}
assert(pthread_mutex_unlock(&slock) == 0);
}
static int Tx(Buf *a, int val) {
Wait(&a->empty);
a->buf = val;
Signal(&a->full);
return 0;
}
static int Rx(Buf *a, int *val) {
Wait(&a->full);
*val = a->buf;
Signal(&a->empty);
return 0;
}
static void *BufInit(void) {
Buf *a = malloc(sizeof *a);
if (a) {
a->full = 0;
a->empty = 1;
}
return a;
}
static void *Proc(void *p) {
struct Targ *t = p;
unsigned s = 0;
int i;
int lj = -1, j;
for (i = t->id; i < t->id+niter; i++) {
napt && usleep(rand_r(&s) % napt);
Tx(t->To, i);
Rx(t->From, &j);
if (lj != -1 && j != lj+1) {
fprintf(stderr, "Err: base %d cur %d, prev %d, this %d\n",
t->id, i, lj, j);
t->id = -1;
return 0;
}
lj = j;
}
t->id = i;
return 0;
}
int main(int ac, char **av) {
pthread_t t[MAXT];
void *v[MAXT];
struct Targ c[MAXT];
int nt = 16;
int i, o;
Buf *A[2];
if (!(A[0] = BufInit()) || !(A[1] = BufInit())) {
perror("bufinit");
exit(1);
}
while ((o = getopt(ac, av, "t:n:i:")) != -1) {
switch (o) {
case 't':
nt = strtol(optarg, 0, 0) * 2;
assert(nt > 0 && nt < MAXT);
break;
case 'n': napt = strtol(optarg, 0, 0); break;
case 'i': niter = strtol(optarg, 0, 0); break;
default:
exit(1);
}
}
for (i = 0; i < nt; i++) {
/* partition the numeric ranges so we get unique messages */
c[i].id = (i+1) * niter;
/*
The two processes are simulated by alternating the buffers between
even and odd threads here:
*/
c[i].From = A[(i&1)];
c[i].To = A[!(i&1)];
}
assert(pthread_cond_init(&scond, 0) == 0);
assert(pthread_mutex_init(&slock, 0) == 0);
for (i = 0; i < nt; i++) {
assert(pthread_create(t+i, 0, Proc, c+i) == 0);
}
for (i = 0; i < nt; i++) {
assert(pthread_join(t[i], v+i) == 0);
printf("%d = %d\n", i, c[i].id);
}
exit(0);
}