иногда зависает несколько процессов через socketpair - PullRequest
0 голосов
/ 14 августа 2011

Я пытаюсь реализовать что-то, что даст мне решение для:

       | --> cmd3 --> cmd4 -->
cmd2-->|
       | --> cmd5 --> cmd6 -->

и так далее ...

Это многократное выполнение процессов и передача результатов по цепочкамиз других процессов с потоками каждая цепочка команд должна выполняться в другом потоке.Я выбираю socketpair для реализации IPC, потому что канал имеет узкое место с ограничением размера буфера 64K.Когда я тестирую программу с одной цепочкой - она ​​работает как положено, но когда я запускаю основную команду и вывод ее, я отправляю через socketpair для чтения завершения нескольких процессов в каждом потоке - программа застряла (выглядит как тупик)1006 *

Что я делаю не так:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <sys/socket.h>

typedef struct command {
    char** argv;
    int num_children;
    struct command* master_cmd;
    struct command** chains;
    struct command* next;
    int fd;
} command;

void be_child(command* cmd);
int execute_master_command_and_pipe_to_childs(command* cmd, int input);
int run_pipeline_sockets(command *cmd, int input);
void waitfor(int fd);

int main(int argc, char* argv[]) {

    handle_segfault();

    command* cmd1 = (command*) malloc(sizeof(command));
    command* cmd2 = (command*) malloc(sizeof(command));
    command* cmd3 = (command*) malloc(sizeof(command));
    command* cmd4 = (command*) malloc(sizeof(command));
    command* cmd5 = (command*) malloc(sizeof(command));
    command* cmd6 = (command*) malloc(sizeof(command));

    command* chains1[2];

    chains1[0] = cmd3;
    chains1[1] = cmd5;

    char* args1[] = { "cat", "/tmp/test.log", NULL };
    char* args3[] = { "sort", NULL, NULL };
    char* args4[] = { "wc", "-l", NULL };
    char* args5[] = { "wc", "-l", NULL };
    char* args6[] = { "wc", "-l", NULL };

    cmd1->argv = args1;
    cmd2->argv = NULL;
    cmd3->argv = args3;
    cmd4->argv = args4;
    cmd5->argv = args5;
    cmd6->argv = args6;

    cmd1->master_cmd = NULL;
    cmd1->next = NULL;
    cmd1->chains = NULL;
    cmd1->num_children = -1;

    cmd2->master_cmd = cmd1;
    cmd2->chains = chains1;
    cmd2->next = NULL;
    cmd2->num_children = 2;

    cmd3->master_cmd = NULL;
    cmd3->next = cmd4;
    cmd3->chains = NULL;
    cmd3->num_children = -1;

    cmd4->master_cmd = NULL;
    cmd4->next = NULL;
    cmd4->chains = NULL;
    cmd4->num_children = -1;

    cmd5->master_cmd = NULL;
    cmd5->next = cmd6;
    cmd5->chains = NULL;
    cmd5->num_children = -1;

    cmd6->master_cmd = NULL;
    cmd6->next = NULL;
    cmd6->chains = NULL;
    cmd6->num_children = -1;

    int rc = execute_master_command_and_pipe_to_childs(cmd2, -1);

    return 0;
}

int execute_master_command_and_pipe_to_childs(command* cmd, int input) {

    int num_children = cmd->num_children;
    int write_pipes[num_children];
    pthread_t threads[num_children];
    command* master_cmd = cmd->master_cmd;

    pid_t pid;
    int i;

    for (i = 0; i < num_children; i++) {
        int new_pipe[2];
        if (socketpair(AF_LOCAL, SOCK_STREAM, 0, new_pipe) < 0) {
            int errnum = errno;
            fprintf(STDERR_FILENO, "ERROR (%d: %s)\n", errnum,
                    strerror(errnum));
            return EXIT_FAILURE;
        }

        if (cmd->chains[i] != NULL) {
            cmd->chains[i]->fd = new_pipe[0];

            if (pthread_create(&threads[i], NULL, (void *) be_child,
                    cmd->chains[i]) != 0) {
                perror("pthread_create"), exit(1);
            }

            write_pipes[i] = new_pipe[1];
        } else {
            perror("ERROR\n");
        }
    }

    if (input != -1) {
        waitfor(input);
    }

    int pipefd = run_pipeline_sockets(master_cmd, input);

    int buffer[1024];

    int len = 0;
    while ((len = read(pipefd, buffer, sizeof(buffer))) != 0) {
        int j;
        for (j = 0; j < num_children; j++) {
            if (write(write_pipes[j], &buffer, len) != len) {
                fprintf(STDERR_FILENO, "Write failed (child %d)\n", j);
                exit(1);
            }

        }

    }

    close(pipefd);

    for (i = 0; i < num_children; i++) {
        close(write_pipes[i]);
    }

    for (i = 0; i < num_children; i++) {
        if (pthread_join(threads[i], NULL) != 0) {
            perror("pthread_join"), exit(1);
        }
    }

}

void waitfor(int fd) {
    fd_set rfds;
    struct timeval tv;
    int retval;

    FD_ZERO(&rfds);
    FD_SET(fd, &rfds);

    tv.tv_sec = 0;
    tv.tv_usec = 500000;

    retval = select(fd + 1, &rfds, NULL, NULL, &tv);

    if (retval == -1)
        perror("select()");
    else if (retval) {
        printf("Data is available now on: %d\n", fd);
    } else {
        printf("No data on: %d\n", fd);
        ///waitfor(fd);
    }
}

void be_child(command* cmd) {

    printf(
            "fd = %d , argv = %s , args = %s , next = %d , master_cmd = %d , next_chain = %d\n",
            cmd->fd, cmd->argv[0], cmd->argv[1], cmd->next, cmd->master_cmd,
            cmd->chains);

    waitfor(cmd->fd);

    int fd = run_pipeline_sockets(cmd, cmd->fd);

    waitfor(fd);

    int buffer[1024];

    int len = 0;

    while ((len = read(fd, buffer, sizeof(buffer))) != 0) {
        write(STDERR_FILENO, &buffer, len);
    }

    close(cmd->fd);
    close(fd);

}

int run_pipeline_sockets(command *cmd, int input) {
    int pfds[2] = { -1, -1 };
    int pid = -1;

    if (socketpair(AF_LOCAL, SOCK_STREAM, 0, pfds) < 0) {
        int errnum = errno;
        fprintf(STDERR_FILENO, "socketpair failed (%d: %s)\n", errnum,
                strerror(errnum));
        return EXIT_FAILURE;
    }

    if ((pid = fork()) == 0) { /* child */
        if (input != -1) {
            dup2(input, STDIN_FILENO);
            close(input);
        }
        if (pfds[1] != -1) {
            dup2(pfds[1], STDOUT_FILENO);
            close(pfds[1]);
        }
        if (pfds[0] != -1) {
            close(pfds[0]);
        }
        execvp(cmd->argv[0], cmd->argv);
        exit(1);
    } else { /* parent */
        if (input != -1) {
            close(input);
        }
        if (pfds[1] != -1) {
            close(pfds[1]);
        }
        if (cmd->next != NULL) {
            run_pipeline_sockets(cmd->next, pfds[0]);
        } else {
            return pfds[0];
        }
    }
}

void segfault_sigaction(int signal, siginfo_t *si, void *arg) {
    printf("Caught segfault at address %p\n", si->si_addr);
    printf("Caught segfault errno %p\n", si->si_errno);
    exit(0);
}

void handle_segfault(void) {
    struct sigaction sa;

    memset(&sa, 0, sizeof(sigaction));
    sigemptyset(&sa.sa_mask);
    sa.sa_sigaction = segfault_sigaction;
    sa.sa_flags = SA_SIGINFO;

    sigaction(SIGSEGV, &sa, NULL);
}

СПАСИБО !!!

1 Ответ

0 голосов
/ 14 августа 2011

Я бы подошел к этой проблеме с совершенно другой точки зрения: вместо того, чтобы придумывать большую структуру данных для управления деревом каналов, и использовать потоки (где блокировка io в процессе может блокировать его потоки), я бы использовал только процессы.

Я также не вижу, как буфер 64К является вашим узким местом, когда вы используете только буфер 1К.

2 простые функции должны направлять это: (обработка ошибок для краткости опущена и используется функция psudocodey parsecmd(), которая превращает разделенную пробелами строку в вектор аргумента)

int mkproc(char *cmd, int outfd)
{
    Command c = parsecmd(cmd);
    int pipeleft[2];
    pipe(pipeleft);
    if(!fork()){
        close(pipeleft[1]);
        dup2(pipeleft[0], 0);
        dup2(outfd, 1);
        execvp(c.name, c.argv);
    }
    close(pipeleft[0]);
    return pipeleft[1];
}

Mkproc берет fd, на который он пишет, и возвращает то, с чего он будет читать. Таким образом, цепочки действительно легко инициализировать:

int chain_in = mkproc("cat foo.txt", mkproc("sort", mkproc("wc -l", 1)));

следующее:

int mktree(char *cmd, int ofd0, ...)
{
    int piperight[2];
    pipe(piperight);

    int cmdin = mkproc(cmd, piperight[1]);
    close(piperight[1]);
    if(!fork()){
        uchar buf[4096];
        int n;

        while((n=read(piperight[0], buf, sizeof buf))>0){
            va_list ap;
            int fd;
            va_start(ap, ofd0);
            for(fd=ofd0; fd!=-1; fd=va_arg(ap, int)){
                write(fd, buf, n);
            }
            va_end(ap);
        }
    }
    return cmdin;
}

Между ними очень просто построить деревья произвольной сложности, например:

int tree_in = mktree("cat foo.txt", 
                  mktree("rot13",
                      mkproc("uniq", mkproc("wc -l", 1)),
                      mkproc("wc -l", open("out.txt", O_WRONLY)), -1),
                  mkproc("sort", 2), -1);

Это выведет отсортированный файл foo.txt в stderr, количество строк в файле rot13'd-foo.txt в out.txt и количество неповторяющихся строк файла rot13'd foo.txt в stdout.

...