Следующий код должен создать один поток источника и один поток потребителя.Источник создает случайные числа, добавляет их в кольцевой буфер (push_back ()) и затем записывает их в выходной текстовый файл с именем prod_out.txt.Потребитель «потребляет» эти числа, выталкивая их (pop_front ()), а затем записывая их в выходной текстовый файл cons_out.txt.Для такого вызова ./a.out 1 1 10 20 1 аргументы таковы: будет 1 производитель, 1 потребитель, 10 - емкость кольцевого буфера, будет сгенерировано 20 случайных чисел и начальное число для rand_r равно 1.
Если емкость буфера меньше количества генерируемых случайных чисел, программа записывает только 9 чисел в txt потребителя, тогда как txt производителя просто в порядке.Если они совпадают, то текст txt потребителя не создается, а текст txt производителя пропускает один номер (а именно последний, точно так же, как текст txt потребителя, когда емкость меньше генерируемых чисел).
Это основная функция
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include "prodcons.h"
FILE * fp;
int rc;
circular_buffer* cb;
int fileW, my_random;
pthread_mutex_t Mutex;
pthread_cond_t Condition;
args* t_args;
void* Producers(void* fargs){
printf("I am Producer and I am not locked\n");
args* f_args= fargs;
rc = pthread_mutex_lock(&Mutex);
printf("I am Producer and I am locked\n");
if (rc != 0) {
printf("ERROR: return code from pthread_mutex_lock() is %d\n", rc);
pthread_exit(&rc);
}
printf(" The ruler of all mutexes has passed from here and shall start
creating random numbers\n");
for(int i=0; i < *(f_args->rand_count); i++){
if(i==cb->capacity){
printf(" Your producer king will now sleep\n");
rc = pthread_cond_signal(&Condition);
if (rc != 0) {
printf("ERROR: return code from pthread_cond_signal() is %d\n",
rc);
pthread_exit(&rc);
}
rc = pthread_cond_wait(&Condition, &Mutex);
if (rc != 0) {
printf("ERROR: return code from pthread_cond_wait() is %d\n",
rc);
pthread_exit(&rc);
}
}
my_random =rand_r(f_args->seed)%255;
printf ( "Creating random number %d.\n", my_random);
cb_push_back(cb, &my_random);
if(i==0){
fp=fopen("prod_in.txt", "w");
}else{
fp=fopen("prod_in.txt", "a");
}
fprintf(fp, "Producer %d: %d\n", f_args->ID, my_random);
fclose(fp);
}
printf("I am going to go to signal the consumer.\n");
rc = pthread_cond_signal(&Condition);
if (rc != 0) {
printf("ERROR: return code from pthread_cond_signal() is %d\n", rc);
pthread_exit(&rc);
}
rc = pthread_mutex_unlock(&Mutex);
if (rc != 0) {
printf("ERROR: return code from pthread_mutex_unlock() is %d\n", rc);
pthread_exit(&rc);
}
}
void* Consumer(void* t1){
int *threadId = (int *) t1;
printf("I am Consumer and I am NOT locked\n");
rc = pthread_mutex_lock(&Mutex);
printf("I am Consumer and I am locked\n");
if (rc != 0) {
printf("ERROR: return code from pthread_mutex_lock() is %d\n", rc);
pthread_exit(&rc);
}
if(cb->count == 0){
rc = pthread_cond_wait(&Condition, &Mutex);
if (rc != 0) {
printf("ERROR: return code from pthread_cond_wait() is %d\n", rc);
pthread_exit(&rc);
}
}
int counter = cb-> count;
for (int j=0; j<= counter; j++){
printf("Consumer will now consume with items remaining %d.\n", (int)cb-
>count);
if(j==0){
fp=fopen("cons_out.txt", "w");
}else{
fp=fopen("cons_out.txt", "a");
}
printf("I will now write the following %d, %d inside the
file.\n",*threadId,my_random);
fprintf(fp, "Consumer %d: %d\n",*threadId, my_random);
cb_pop_front(cb,&my_random);
if(cb->count == 0){
rc = pthread_cond_signal(&Condition);
if (rc != 0) {
printf("ERROR: return code from pthread_cond_signal() is %d\n",
rc);
pthread_exit(&rc);
}
rc = pthread_cond_wait(&Condition, &Mutex);
if (rc != 0) {
printf("ERROR: return code from pthread_cond_wait() is %d\n",
rc);
pthread_exit(&rc);
}
}
fclose(fp);
}
rc = pthread_mutex_unlock(&Mutex);
if (rc != 0) {
printf("ERROR: return code from pthread_mutex_unlock() is %d\n", rc);
pthread_exit(&rc);
}
}
int main(int argc, char *argv[]){
if (argc !=6){
printf("ERROR: the program should take five arguments!\n");
exit(-1);
}
if(atoi(argv[1])!= 1 || atoi(argv[2])!= 1){
printf("ERROR:This program only functions for one producer and
consumer.\n");
exit(-1);
}
int main_seed = atoi(argv[5]);
int main_rand = atoi(argv[4]);
int buffer_size = atoi(argv[3]);
cb=(circular_buffer*) malloc(sizeof(struct circular_buffer));
t_args=(args*)malloc(sizeof(struct args));
cb_init(cb, buffer_size, sizeof(int));
t_args->seed= &main_seed;
t_args->rand_count= &main_rand;
t_args->ID=1;
int t1=1;
pthread_t p1, c1;
printf ( "I am the ruler of all mutexes. I now shall initialise the
mutex\n");
rc = pthread_mutex_init(&Mutex, NULL);
if (rc != 0) {
printf("ERROR: return code from pthread_mutex_init() is %d\n", rc);
exit(-1);
}
rc = pthread_cond_init(&Condition, NULL);
if (rc != 0) {
printf("ERROR: return code from pthread_cond_init() is %d\n", rc);
exit(-1);
}
rc = pthread_create(&p1, NULL, Producers, (void*)t_args); //cast σε void????
if (rc != 0) {
printf("ERROR: return code from pthread_create() is %d\n", rc);
exit(-1);
}
rc = pthread_create(&c1, NULL,Consumer, &t1);
if (rc != 0) {
printf("ERROR: return code from pthread_create() is %d\n", rc);
exit(-1);
}
void *status;
rc = pthread_join(p1, &status);
if (rc != 0) {
printf("ERROR: return code from pthread_join() is %d\n", rc);
exit(-1);
}
printf("Main(): Thread %d terminated successfully.\n", *(int *) status);
rc = pthread_join(c1, &status);
if (rc != 0) {
printf("ERROR: return code from pthread_join() is %d\n", rc);
exit(-1);
}
printf("Main(): Thread %d terminated successfully.\n", *(int *) status);
rc = pthread_mutex_destroy(&Mutex);
if (rc != 0) {
printf("ERROR: return code from pthread_mutex_destroy() is %d\n", rc);
exit(-1);
}
rc = pthread_cond_destroy(&Condition);
if (rc != 0) {
printf("ERROR: return code from pthread_cond_destroy() is %d\n", rc);
exit(-1);
}
free(cb);
free(t_args);
return 1;
}
Это заголовочный файл с двумя структурами и функциями циклического буфера
#include "stdio.h"
#include "sys/types.h"
#include "stdlib.h"
#include "string.h"
typedef struct circular_buffer
{
void *buffer; // data buffer
void *buffer_end; // end of data buffer
size_t capacity; // maximum number of items in the buffer
size_t count; // number of items in the buffer
size_t sz; // size of each item in the buffer
void *head; // pointer to head
void *tail; // pointer to tail
}circular_buffer;
typedef struct args
{
int ID; //Unique ID from 1 to n
int* seed; //The seed used for rand_r
int* rand_count; //The number of random generated numbers
}args;
void cb_push_back(circular_buffer *cb, const void *item);
void cb_pop_front(circular_buffer *cb, void *item);
void cb_init(circular_buffer *cb, size_t capacity, size_t sz);
void cb_free(circular_buffer *cb);
В этом файле приведены реализации циклических буферных функций
#include "prodcons.h"
//initialize circular buffer
//capacity: maximum number of elements in the buffer
//sz: size of each element
void cb_init(circular_buffer *cb, size_t capacity, size_t sz)
{
cb->buffer = malloc(capacity * sz);
if(cb->buffer == NULL){
printf("Could not allocate memory..Exiting! \n");
exit(1);
}
// handle error
cb->buffer_end = (char *)cb->buffer + capacity * sz;
cb->capacity = capacity;
cb->count = 0;
cb->sz = sz;
cb->head = cb->buffer;
cb->tail = cb->buffer;
}
//destroy circular buffer
void cb_free(circular_buffer *cb)
{
free(cb->buffer);
// clear out other fields too, just to be safe
}
//add item to circular buffer
void cb_push_back(circular_buffer *cb, const void *item)
{
if(cb->count == cb->capacity)
{
printf("Access violation. Buffer is full\n");
exit(1);
}
memcpy(cb->head, item, cb->sz);
cb->head = (char*)cb->head + cb->sz;
if(cb->head == cb->buffer_end)
cb->head = cb->buffer;
cb->count++;
}
//remove first item from circular item
void cb_pop_front(circular_buffer *cb, void *item)
{
if(cb->count == 0)
{
printf("Access violation. Buffer is empy\n");
exit(1);
}
memcpy(item, cb->tail, cb->sz);
cb->tail = (char*)cb->tail + cb->sz;
if(cb->tail == cb->buffer_end)
cb->tail = cb->buffer;
cb->count--;
}