Обычно вы должны синхронизировать свои потоки, используя условные переменные или семафоры.Но в этом случае pipe
также может помочь синхронизировать потоки, поскольку направление связи остается неизменным, в противном случае вам потребуется 2 канала.
Во-первых, я бы создал канал перед рабочим потоком.создано.Таким образом, основной поток не должен ждать, пока рабочий создаст канал, он может немедленно начать запись в него.Мы можем использовать это поведение для синхронизации потоков, потому что read
будет блокироваться, пока другой конец не напишет что-либо в канал.Таким образом, рабочий поток по существу ждет, пока в канале не появятся данные.
Я немного изменил ваш пример, чтобы продемонстрировать это.Обратите внимание, что я также использую sleep
в основном потоке, но я использую его не для синхронизации потоков, а для того, чтобы показать, что рабочий поток действительно ждет, пока основной поток что-то напишет:
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
typedef struct th_data {
int fd[2];
// add other arguments as needed
} th_data;
void *myworker(void *args)
{
int payload;
th_data *data = args;
time_t now;
time_t start, stop;
start = time(NULL);
ssize_t ret = read(data->fd[0], &payload, sizeof payload);
now = time(NULL);
if(ret == -1)
{
perror("read, terminating worker");
pthread_exit(0);
}
stop = time(NULL);
printf("* worker [%zu]: waited %zu seconds, payload: %d\n", now, stop-start, payload);
//////////////////
start = time(NULL);
ret = read(data->fd[0], &payload, sizeof payload);
now = time(NULL);
if(ret == -1)
{
perror("read, terminating worker");
pthread_exit(0);
}
stop = time(NULL);
printf("* worker [%zu]: waited %zu seconds, payload: %d\n", now, stop-start, payload);
//////////////////
start = time(NULL);
ret = read(data->fd[0], &payload, sizeof payload);
now = time(NULL);
if(ret == -1)
{
perror("read, terminating worker");
pthread_exit(0);
}
stop = time(NULL);
printf("* worker [%zu]: waited %zu seconds, payload: %d\n", now, stop-start, payload);
pthread_exit(0);
}
int main(void)
{
pthread_t th;
th_data data;
time_t now;
if(pipe(data.fd) < 0)
{
perror("pipe");
return 1;
}
pthread_create(&th, NULL, myworker, &data);
int payload = 88;
printf("+ Main thread: sleep 1 second\n");
sleep(1);
now = time(NULL);
printf("+ Main thread [%zu], writing...\n", now);
write(data.fd[1], &payload, sizeof payload);
printf("+ Main thread: sleep 2 seconds\n");
sleep(2);
payload = -12;
now = time(NULL);
printf("+ Main thread [%zu], writing...\n", now);
write(data.fd[1], &payload, sizeof payload);
printf("+ Main thread: sleep 3 seconds\n");
sleep(3);
payload = 1024;
now = time(NULL);
printf("+ Main thread [%zu], writing...\n", now);
write(data.fd[1], &payload, sizeof payload);
pthread_join(th, NULL);
return 0;
}
выходные данные этой программы
+ Main thread: sleep 1 second
+ Main thread [1524698241], writing...
+ Main thread: sleep 2 seconds
* worker [1524698241]: waited 1 seconds, payload: 88
+ Main thread [1524698243], writing...
+ Main thread: sleep 3 seconds
* worker [1524698243]: waited 2 seconds, payload: -12
+ Main thread [1524698246], writing...
* worker [1524698246]: waited 3 seconds, payload: 1024
, и, как видно из чисел в квадратных скобках, рабочий поток возобновляет работу сразу после того, как основной поток что-то записывает в канал.Факт, что строки немного не синхронизированы, потому что printf
буферизован.Но число в квадратных скобках говорит вам точно, когда какая часть потоков была выполнена.
Мне не очень нравится этот подход, если честно, если и основной поток, и рабочий должны читатьи пишите друг другу, тогда одной трубы не хватит.В этом случае вам нужен другой канал для другого направления или вам нужна лучшая стратегия синхронизации.Я бы использовал условные переменные, потому что они также очень хорошо работают с несколькими потоками.Аналогичный пример с условными переменными
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
int payload; // shared resource
void *worker(void *args)
{
int i = 0;
int payloads[] = { 1, 3, -9};
size_t len = sizeof payloads / sizeof payloads[0];
int oldpayload;
while(1)
{
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cv, &mutex);
printf("* worker[%d]: main thread signal cond variable, payload is %d\n", i, payload);
oldpayload = payload;
payload = payloads[i % len];
printf("* worker[%d]: setting payload to %d\n", i, payload);
pthread_mutex_unlock(&mutex);
// now waking up master
pthread_cond_signal(&cv);
if(oldpayload == 99)
break;
i++;
}
printf("* worker: My work is done, bye\n");
pthread_exit(0);
}
int main(void)
{
pthread_t th;
pthread_create(&th, NULL, worker, NULL);
int payloads[] = { 19, -12, 110, 1024, 99 };
for(size_t i = 0; i < sizeof payloads / sizeof payloads[0]; ++i)
{
printf("+ main[%zu]: doing some work, setting payload to %d\n", i, payloads[i]);
fflush(stdout);
usleep(10000); // simulation work
payload = payloads[i];
pthread_cond_signal(&cv);
// time for master to wait
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cv, &mutex);
printf("+ main[%zu]: worker set payload to %d\n\n", i, payload);
fflush(stdout);
pthread_mutex_unlock(&mutex);
}
pthread_join(th, NULL);
return 0;
}
И выходные данные для этого:
+ main[0]: doing some work, setting payload to 19
* worker[0]: main thread signal cond variable, payload is 19
* worker[0]: setting payload to 1
+ main[0]: worker set payload to 1
+ main[1]: doing some work, setting payload to -12
* worker[1]: main thread signal cond variable, payload is -12
* worker[1]: setting payload to 3
+ main[1]: worker set payload to 3
+ main[2]: doing some work, setting payload to 110
* worker[2]: main thread signal cond variable, payload is 110
* worker[2]: setting payload to -9
+ main[2]: worker set payload to -9
+ main[3]: doing some work, setting payload to 1024
* worker[3]: main thread signal cond variable, payload is 1024
* worker[3]: setting payload to 1
+ main[3]: worker set payload to 1
+ main[4]: doing some work, setting payload to 99
* worker[4]: main thread signal cond variable, payload is 99
* worker[4]: setting payload to 3
* worker: My work is done, bye
+ main[4]: worker set payload to 3
Здесь следует обратить внимание: переменная payload
- это наш общий ресурс, который читается и записываетсякак основной, так и рабочий поток.Обычно вам понадобится мьютекс для защиты чтения и записи общего ресурса.Обратите внимание, что основной поток не блокирует мьютекс при записи в payload
.Причина, по которой я могу опустить это, связана с условной переменной: я синхронизировал потоки, так что только один поток может одновременно записывать payload
, поэтому я знаю, что когда
payload = payloads[i];
выполняетсяосновной поток, рабочий поток заблокирован и ожидает сигнала переменной условия, поэтому он не записывает payload
одновременно с основным потоком.Конечно, я мог бы использовать второй мьютекс только для этого, но я не думаю, что это необходимо.
Заключительные замечания:
Один из них - правильный способ определения main
три:
int main(void);
int main(int argc, char **argv);
int main(int argc, char *argv[]);
Также имейте в виду, что переменная WorkerThreadRet
указывает на NULL
.Вам нужно было бы выделить память для него перед передачей в поток.
В вашем коде перед выполнением
write(pipe_test[1], &status, sizeof(status));
вы можете инициализировать переменную status
перед операцией записи,в противном случае вы отправляете неинициализированное значение.