У меня есть вопрос о чтении / записи с / на pipe
при использовании forked
процессов. Мне не удалось найти способ эффективно проверить, есть ли что-то, что нужно прочитать по каналу, и избежать блокировки моего кода. Приложение состоит из нескольких child
процессов, созданных из одного parent
процесса, который ожидает получения нескольких write()
операций, отправленных children
.
Пока что я читаю, что children
посылают так:
MSG received;
int lines_received = 0;
/* the child has been forked and we are in the parent */
while(lines_received != IMAGE_HEIGHT-1) {
for(i = 0; i < child_processes; i++) {
if(read(fd[i][0], &received, sizeof(MSG)) == sizeof(MSG)) {
//store Received elements into larger pixel's array
//printf("Received row index %d\n", received.row_index);
memcpy(pixels + received.row_index*IMAGE_WIDTH, received.rowdata, sizeof(float)*IMAGE_WIDTH);
lines_received++;
}
}
}
Это функционально, но кажется крайне неэффективным и явно глупым способом. Какой лучший способ сделать то же самое?
Полный код:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#define IMAGE_WIDTH 800
#define IMAGE_HEIGHT 800
typedef struct message {
int row_index;
float rowdata[IMAGE_WIDTH];
} MSG;
int main(int argc, char* argv[]) {
pid_t wpid;
int status = 0;
int i;
int child_processes = strtol(argv[1], NULL, 10);
pid_t pid;
int fd[child_processes][2];
int per_child = IMAGE_HEIGHT / child_processes;
int last_one_extra = IMAGE_HEIGHT % child_processes;
int y_start, y_end;
float * pixels;
pixels = (float*)malloc(sizeof(float)*IMAGE_WIDTH*IMAGE_HEIGHT);
for(i = 0; i < child_processes; i++) {
if(pipe(fd[i]) != 0){
fprintf(stderr, "Pipe Failed");
exit(-1);
}
}
for (i = 0; i < child_processes; i++) {
pid = fork();
if(pid == 0) {
if(i == child_processes - 1) {
y_start = i*per_child;
y_end = i*per_child + per_child + last_one_extra;
} else {
y_start = i*per_child;
y_end = i*per_child + per_child;
}
close(fd[i][0]); // child closes read side of pipe
break;
} else if (pid > 0) {
/* the child has been forked and we are in the parent */
close(fd[i][1]); //parent closes write side of pipe
} else {
printf("Fork failed\n");
exit(-2);
}
}
if (pid == 0) {
printf("%d - Y start: %d; Y end: %d\n", getpid(), y_start, y_end);
int x, y;
for (y=y_start; y<y_end; y++) {
MSG send_to_parent;
send_to_parent.row_index = y;
for (x=0; x<IMAGE_WIDTH; x++) {
//compute a value for each point c (x, y) in the complex plane
send_to_parent.rowdata[x] = y*IMAGE_WIDTH+x;
}
//send single line of image
if(write(fd[i][1], &send_to_parent, sizeof(MSG)) != sizeof(MSG)) {
printf("Child [%d] Error in Write!\n", getpid());
exit(-3);
} else {
//printf("Child [%d] Written row %d!\n", getpid(), send_to_parent.row_index);
}
}
close(fd[i][1]); //child closes write side of pipe
exit(0);
} else if (pid > 0) {
MSG received;
int lines_received = 0;
/* the child has been forked and we are in the parent */
while(lines_received != IMAGE_HEIGHT-1) {
for(i = 0; i < child_processes; i++) {
if(read(fd[i][0], &received, sizeof(MSG)) == sizeof(MSG)) {
//store Received elements into larger pixel's array
memcpy(pixels + received.row_index*IMAGE_WIDTH, received.rowdata, sizeof(float)*IMAGE_WIDTH);
lines_received++;
}
}
}
for(i = 0; i < child_processes; i++) {
close(fd[i][0]); //parent closes read side of pipe
}
}
//this line becomes useless
while ((wpid = wait(&status)) > 0); // this way, the father waits for all the child processes
return 0;
}