Труба: проверка, есть ли что прочитать - PullRequest
0 голосов
/ 29 октября 2019

У меня есть вопрос о чтении / записи с / на pipe при использовании forked процессов. Мне не удалось найти способ эффективно проверить, есть ли что-то, что нужно прочитать по каналу, и избежать блокировки моего кода. Приложение состоит из нескольких child процессов, созданных из одного parent процесса, который ожидает получения нескольких write() операций, отправленных children.

Пока что я читаю, что children посылают так:

MSG received;
int lines_received = 0;
/* the child has been forked and we are in the parent */
while(lines_received != IMAGE_HEIGHT-1) {
    for(i = 0; i < child_processes; i++) {
        if(read(fd[i][0], &received, sizeof(MSG)) == sizeof(MSG)) {
            //store Received elements into larger pixel's array
            //printf("Received row index %d\n", received.row_index);
            memcpy(pixels + received.row_index*IMAGE_WIDTH, received.rowdata, sizeof(float)*IMAGE_WIDTH);
            lines_received++;
        }
    }
}

Это функционально, но кажется крайне неэффективным и явно глупым способом. Какой лучший способ сделать то же самое?

Полный код:

#include <stdio.h>
#include <stdlib.h>

#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>

#define IMAGE_WIDTH     800
#define IMAGE_HEIGHT    800

typedef struct message {
    int     row_index;
    float   rowdata[IMAGE_WIDTH];
} MSG;


int main(int argc, char* argv[]) {

    pid_t   wpid;
    int     status = 0;
    int     i;

    int     child_processes = strtol(argv[1], NULL, 10);
    pid_t   pid;
    int     fd[child_processes][2];
    int     per_child = IMAGE_HEIGHT / child_processes;
    int     last_one_extra = IMAGE_HEIGHT % child_processes; 
    int     y_start, y_end;
    float * pixels;

    pixels = (float*)malloc(sizeof(float)*IMAGE_WIDTH*IMAGE_HEIGHT);

    for(i = 0; i < child_processes; i++) {
        if(pipe(fd[i]) != 0){
            fprintf(stderr, "Pipe Failed");
            exit(-1);
        }
    }

    for (i = 0; i < child_processes; i++) {
        pid = fork();

        if(pid == 0) {
            if(i == child_processes - 1) {
                y_start = i*per_child;
                y_end = i*per_child + per_child + last_one_extra;
            } else {
                y_start = i*per_child;
                y_end = i*per_child + per_child;
            }

            close(fd[i][0]); // child closes read side of pipe
            break;
        } else if (pid > 0) {
            /* the child has been forked and we are in the parent */
            close(fd[i][1]);   //parent closes write side of pipe
        } else {
            printf("Fork failed\n");
            exit(-2);
        }
    }

    if (pid == 0) {
        printf("%d - Y start: %d; Y end: %d\n", getpid(), y_start, y_end);

        int x, y;
        for (y=y_start; y<y_end; y++) {

            MSG send_to_parent;

            send_to_parent.row_index = y;
            for (x=0; x<IMAGE_WIDTH; x++) {
                //compute a value for each point c (x, y) in the complex plane
                send_to_parent.rowdata[x] = y*IMAGE_WIDTH+x;
            }
            //send single line of image
            if(write(fd[i][1], &send_to_parent, sizeof(MSG)) != sizeof(MSG)) {
                printf("Child [%d] Error in Write!\n", getpid());
                exit(-3);
            } else {
                //printf("Child [%d] Written row %d!\n", getpid(), send_to_parent.row_index);
            }
        }

        close(fd[i][1]);   //child closes write side of pipe
        exit(0);
    } else if (pid > 0) {
        MSG received;
        int lines_received = 0;
        /* the child has been forked and we are in the parent */
        while(lines_received != IMAGE_HEIGHT-1) {
            for(i = 0; i < child_processes; i++) {
                if(read(fd[i][0], &received, sizeof(MSG)) == sizeof(MSG)) {
                    //store Received elements into larger pixel's array
                    memcpy(pixels + received.row_index*IMAGE_WIDTH, received.rowdata, sizeof(float)*IMAGE_WIDTH);
                    lines_received++;
                }
            }
        }
        for(i = 0; i < child_processes; i++) {
            close(fd[i][0]);   //parent closes read side of pipe
        }
    }

    //this line becomes useless
    while ((wpid = wait(&status)) > 0); // this way, the father waits for all the child processes 

    return 0;
} 
...