Как настроить неблокирующие именованные каналы на Linux, используя C? - PullRequest
0 голосов
/ 04 мая 2020

У меня 3 процесса и две трубы. Process Producer генерирует символ и отправляет его процессу Buffer через канал ввода. Process Buffer сохраняет данные в массиве, а затем отправляет их для обработки Consumer через канал вывода. Затем потребитель печатает данные. Возможно ли сделать это неблокирующим способом?

В настоящее время процесс Consumer читает только один раз в секунду, а процесс Producer выводит один раз каждые 0,3 секунды. Вызов Consumer блокирует процесс Buffer, и несколько выходов Producer не сохраняются в буфере, оставаясь во входном канале до следующего l oop. В настоящее время программа работает в течение 5 секунд, которую можно изменить с помощью переменной длины в начале функции main ().

#include "stdio.h"//get from and print to console
#include "stdlib.h"//free and malloc
#include "unistd.h"//fork(), exec(), sleep(), and pid_t
#include "sys/types.h"//pid_t
#include <string.h>//strtok
#include "sys/wait.h"//waitpid()
#include <time.h>//time()
#include <fcntl.h>//O_NONBLOCK, fd manipulation
#include <sys/stat.h>//mkfifo()

const int BUFFSIZE = 1000;

void buffer(time_t start, int len, char* buff, const char *ppipe, const char *cpipe);
void producer(time_t start, int len, const char *ppipe);
void consumer(time_t start, int len, const char *cpipe);

void main() {
    int length = 5;//length of program (seconds)
    char *buff = malloc(BUFFSIZE*sizeof(char));
    for(int i = 0;i<BUFFSIZE;i++) {
        buff[i] = '*';
    }
    //create the needed pipes
    const char *pfifo1 = "producer1";
    const char *cfifo1 = "consumer1";
    mkfifo(pfifo1, 0666);
    mkfifo(cfifo1, 0666);

    time_t start = time(NULL);
    srand(start);

    pid_t buf_p = fork();//fork off the buffer
    if(buf_p == 0) {
        buffer(start, length, buff, pfifo1, cfifo1);
    }

    pid_t prod_p = fork();//fork off the producer
    if(prod_p == 0) {
        producer(start, length, max_p, pfifo1);
    }

    pid_t con_p = fork();//fork off the consumer
    if(con_p == 0) {
        consumer(start, length, max_c, cfifo1);
    }

    pid_t wpid;//wait til the processes are finished
    while((wpid=wait(NULL))!=-1) {
    }
    unlink(pfifo1);//delete the pipes
    unlink(cfifo1);
    free(buff);
    printf("All processes terminated.\n");
}

void buffer(time_t start, int len, char *buff, const char *ppipe, const char *cpipe) {
    char input;
    int check;//checks if the pipe has a value
    int exitind = 0;
    int inputind = 0;
    int infd = open(ppipe, O_RDONLY, O_NONBLOCK);
    int exfd = open(cpipe, O_WRONLY, O_NONBLOCK);
    while(time(NULL)-start<len) {//loop until the program time ends
        //get the product from the producers
        if(buff[inputind]=='*') {//if the buffer is not full
            check = read(infd, &input, 1);//read from the input pipe
            if(check !=-1) {//ie there was input in the pipe
                buff[inputind] = input;//add it to the buffer
                inputind=(inputind+1)%BUFFSIZE;//inputind loops.
            }
        }

        //write from the array to the exit pipe
        if(buff[exitind]!='*') {//if the buffer has data
            char output = buff[exitind];
            check = write(exfd, &output, 1);
            if(check!=-1) {
                buff[exitind] = '*';//clear the entry
                exitind=(exitind+1)%BUFFSIZE;//exitind loops.
            }
        }
    }
    for(int i = 0;i<BUFFSIZE;i++) {
        printf("%c ", buff[i]);
        if((i+1)%40 ==0) {
            printf("\n");
        }
    }
    close(infd);
    close(exfd);
    printf("Buffer ended.\n");
    exit(0);
}

//function that each producer process runs.
void producer(time_t start, int len, const char *ppipe) {
    int fd = open(ppipe, O_WRONLY);
    while(time(NULL)-start<len) {//loop until the program time ends
        char output = 'A'+(rand()%26);//generate random character,
        printf("Producer value = %c\n", output);
        write(fd, &output, 1);//write it to the input pipe
        usleep(300000);//0.3 seconds of sleep
    }
    close(fd);
    printf("Producer ended\n");
    exit(0);
}

//function that each consumer process runs.
void consumer(time_t start, int len, const char *cpipe) {
    int fd = open(cpipe, O_RDONLY);
    char input;
    while(time(NULL)-start<len) {//loop until the program time ends
        sleep(1);
        //read a character from the buffer
        read(fd, &input, 1);
        printf("Consumer, value = %c\n", input);
    }
    close(fd);
    printf("Consumer ended\n");
    exit(0);
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...