Асинхронное двунаправленное перенаправление ввода-вывода для дочернего процесса - PullRequest
4 голосов
/ 04 марта 2012

Я пытаюсь найти обобщенный способ асинхронного двунаправленного перенаправления ввода-вывода дочернего процесса. По сути, я хотел бы создать интерактивный дочерний процесс, который ожидает ввода, и любой вывод должен быть прочитан обратно. Я попытался поэкспериментировать с python.subprocess, создав новый процесс python. Базовый упрощенный пример, который пытались достичь, выглядит следующим образом:

process = subprocess.Popen(['/usr/bin/python'],shell=False,stdin=subprocess.PIPE, stdout=subprocess.PIPE)
while True:
    output = process.stdout.readline()
    print output
    input = sys.stdin.readline()
    process.stdin.write(input)

и выполнение приведенного выше фрагмента кода просто зависает без вывода. Я попытался запустить с /usr/bash и /usr/bin/irb, но результат все тот же. Я предполагаю, что буферизованный ввод-вывод просто не очень хорошо с перенаправлением ввода-вывода.

Итак, мой вопрос: возможно ли прочитать выходные данные дочернего процесса без очистки буфера или выхода из подпроцесса?

В следующем посте упоминаются сокеты IPC, но для этого мне пришлось бы изменить дочерний процесс, который может оказаться невозможным. Есть ли другой способ добиться этого?

Примечание *** Моя конечная цель - создать процесс REPL сервера, который может взаимодействовать с удаленным веб-клиентом. Хотя приведенный пример относится к Python, моя конечная цель - обернуть все доступные REPL обобщенной оболочкой.


С помощью некоторых предложений в ответах я придумал следующее

#!/usr/bin/python
import subprocess, os, select
proc = subprocess.Popen(['/usr/bin/python'],shell=False,stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE)
for i in xrange(0,5):
    inputready, outputready, exceptready = select.select([proc.stdout, proc.stderr],[proc.stdout, proc.stderr],[proc.stdout, proc.stderr],0)
    if not inputready: print "No Data",
    print inputready, outputready, exceptready
    for s in inputready: print s.fileno(),s.readline()
proc.terminate()
print "After Terminating"
for i in xrange(0,5):
    inputready, outputready, exceptready = select.select([proc.stdout, proc.stderr],[proc.stdout, proc.stderr],[proc.stdout, proc.stderr],0)
    if not inputready: print "No Data",
    print inputready, outputready, exceptready
    for s in inputready: print s.fileno(),s.readline() 

сейчас, хотя программы не находятся в тупике, но, к сожалению, выходных данных нет. Запустив приведенный выше код, я получаю

No Data [] [] []
No Data [] [] []
No Data [] [] []
No Data [] [] []
No Data [] [] []
After Terminating
No Data [] [] []
No Data [] [] []
No Data [] [] []
No Data [] [] []
No Data [] [] []

Просто к вашему сведению, работает Python как

/usr/bin/python 2>&1|tee test.out

, кажется, работает очень хорошо.

Я также придумал код "С". Но результат не отличается.

int kbhit() {
    struct timeval tv;
    fd_set fds;
    tv.tv_sec = tv.tv_usec = 0;
    FD_ZERO(&fds);
    FD_SET(STDIN_FILENO, &fds);
    select(STDIN_FILENO+1, &fds, NULL, NULL, &tv);
    return FD_ISSET(STDIN_FILENO, &fds);
}
void receive(char *str) {
    char ch;
    fprintf(stderr,"IN1\n");
    if(!kbhit()) return;
    fprintf(stderr,"IN2\n");
    fprintf(stderr,"%d\n",kbhit());
    for(;kbhit() && (ch=fgetc(stdin))!=EOF;) {
        fprintf(stderr,"%c,%d",ch,kbhit());
    }
    fprintf(stderr,"Done\n");
}
int main(){
    pid_t pid;
    int rv, pipeP2C[2],pipeC2P[2];  
    pipe(pipeP2C);
    pipe(pipeC2P);
    pid=fork();
    if(pid){
        dup2(pipeP2C[1],1); /* Replace stdout with out side of the pipe */
        close(pipeP2C[0]);  /* Close unused side of pipe (in side) */
        dup2(pipeC2P[0],0); /* Replace stdin with in side of the pipe */
        close(pipeC2P[1]);  /* Close unused side of pipe (out side) */
        setvbuf(stdout,(char*)NULL,_IONBF,0);   /* Set non-buffered output on stdout */
        sleep(2);
        receive("quit()\n");
        wait(&rv);              /* Wait for child process to end */
        fprintf(stderr,"Child exited with a %d value\n",rv);
    }
    else{
        dup2(pipeP2C[0],0); /* Replace stdin with the in side of the pipe */
        close(pipeP2C[1]);  /* Close unused side of pipe (out side) */
        dup2(pipeC2P[1],1); /* Replace stdout with the out side of the pipe */
        close(pipeC2P[0]);  /* Close unused side of pipe (out side) */
        setvbuf(stdout,(char*)NULL,_IONBF,0);   /* Set non-buffered output on stdout */
        close(2), dup2(1,2); /*Redirect stderr to stdout */
        if(execl("/usr/bin/python","/usr/bin/python",NULL) == -1){
            fprintf(stderr,"execl Error!");
            exit(1);
        }
    }
    return 0;
}

Ответы [ 5 ]

1 голос
/ 06 марта 2012

В коде Python, который вы разместили, вы не используете правильные потоки:

inputready, outputready, exceptready = select.select(
    [proc.stdout, proc.stderr], # read list
    [proc.stdout, proc.stderr], # write list
    [proc.stdout, proc.stderr], # error list.
    0)                          # time out.

Я не пытался это исправить, но держу пари, что чтение и запись в один и тот же набор потоков неверны.


В вашей выборке несколько ошибок.Во-первых, исполняемый файл python, который вы запускаете как дочерний процесс, не выдает никаких результатов.Во-вторых, существует условие гонки, поскольку вы можете вызывать select() 5 раз подряд, прежде чем дочерний процесс выдаст выходные данные, и в этом случае вы убьете процесс перед чтением чего-либо.

Я исправил трипроблемы, упомянутые выше (список записи, запуск процесса, который производит вывод и состояние гонки).Попробуйте этот пример и посмотрите, работает ли он для вас:

#!/usr/bin/python
import subprocess, os, select, time

path = "/usr/bin/python"
proc = subprocess.Popen([path, "foo.py"], shell=False,
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE)
for i in xrange(0,5):
    time.sleep(1)
    inputready, outputready, exceptready = select.select(
        [proc.stdout, proc.stderr], [proc.stdin,],
        [proc.stdout, proc.stderr, proc.stdin], 0)
    if not inputready:
        print "No Data",
    print inputready, outputready, exceptready
    for s in inputready:
        print s.fileno(),s.readline()

proc.terminate()
print "After Terminating"

for i in xrange(0,5):
    inputready, outputready, exceptready = select.select(
        [proc.stdout, proc.stderr], [proc.stdin,],
        [proc.stdout, proc.stderr, proc.stdin], 0)
    if not inputready:
        print "No Data",
    print inputready, outputready, exceptready
    for s in inputready:
        print s.fileno(),s.readline()

Файл foo.py, который я использовал, содержал это:

#!/usr/bin/python
print "Hello, world!"

Следующая версия (в основном удалена излишняя)вывод для облегчения чтения результатов):

#!/usr/bin/python
import subprocess, os, select, time

path = "/usr/bin/python"
proc = subprocess.Popen([path, "foo.py"], shell=False,
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE)
for i in xrange(0,5):
    time.sleep(1)
    inputready, outputready, exceptready = select.select(
        [proc.stdout, proc.stderr], [proc.stdin,],
        [proc.stdout, proc.stderr, proc.stdin], 0)
    for s in inputready:
        line = s.readline()
        if line:
            print s.fileno(), line

proc.terminate()
print "After Terminating"

for i in xrange(0,5):
    time.sleep(1)
    inputready, outputready, exceptready = select.select(
        [proc.stdout, proc.stderr], [proc.stdin,],
        [proc.stdout, proc.stderr, proc.stdin], 0)
    for s in inputready:
        line = s.readline()
        if line:
            print s.fileno(), line

Дает следующий вывод:

5 Привет, мир!

После завершения

Обратите внимание, что по какой-то причине использование параметра timeout в select.select() не дало ожидаемых результатов в моей системе, и я вместо этого прибегнул к использованию time.sleep().


Только к вашему сведению, запуск python как

/usr/bin/python 2>&1|tee test.out

, кажется, работает нормально.

Вы не можете получить этот эффект, потому что этот пример все еще дает интерпретатору python управляющий tty.Без управляющего tty интерпретатор python не печатает версию Python и не отображает приглашение >>>.

Примером закрытия может быть что-то вроде следующего.Вы можете заменить /dev/null файлом, содержащим команды для отправки интерпретатору.

/usr/bin/python </dev/null 2>&1|tee test.out

Если вы перенаправляете что-либо , кроме управляющего tty (клавиатура), в качестве стандартного ввода дляпроцесс, вы не получите вывод от интерпретатора Python.Вот почему ваш код не работает.

1 голос
/ 06 марта 2012

Есть разные способы сделать это. Вы можете, например:

  • использовать очереди сообщений SysV и опросить время ожидания в очереди на получение сообщения
  • создать pipe () для дочернего элемента и pipe () для отца, используя флаг O_NONBLOCK, а затем select () в файловых дескрипторах для данных, прибывающих (чтобы даже обрабатывать тайм-ауты, если данные не поступают)
  • используйте socket () AF_UNIX или AF_INET, установите его неблокирующим и выберите () или epoll () для получения данных
  • mmap () MAP_SHARED сегментирует память и сигнализирует другому процессу о получении данных, обратите внимание на общий сегмент с механизмом блокировки.

Я написал образец на C с двойными трубами:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <sys/select.h>
#include <fcntl.h>
#include <signal.h>

#define BUFLEN (6*1024)
#define EXECFILE "/usr/bin/python"

char *itoa(int n, char *s, int b) {
        static char digits[] = "0123456789abcdefghijklmnopqrstuvwxyz";
        int i=0, sign;

        if ((sign = n) < 0)
                n = -n;

        do {
                s[i++] = digits[n % b];
        } while ((n /= b) > 0);

        if (sign < 0)
                s[i++] = '-';
        s[i] = '\0';

        return s;
}

/*
int set_nonblock(int sockfd) { // set socket to non blocking
        int arg,i;

        if ((arg=fcntl(sockfd, F_GETFL, NULL)) < 0) {
                printf("error getting socket flag for fd %i: fcntl(..., F_GETFL): %i\n", sockfd, errno);
                return -1;
        }
        // set O_NONBLOCK flag
        arg |= O_NONBLOCK;
        if ((i=fcntl(sockfd, F_SETFL, arg)) < 0) {
                printf("error setting socket flag for fd %i: fcntl(..., F_SETFL): %i\n", sockfd, errno);
                return -1;
        }
        return i;
}

int set_block(int sockfd) { // set socket to blocking
        int arg,i;

        if ((arg=fcntl(sockfd, F_GETFL, NULL)) < 0) {
                printf("error getting socket flag for fd %i: fcntl(..., F_GETFL): %i\n", sockfd, errno);
                return -1;
        }
        // clean O_NONBLOCK flag
        arg &= (~O_NONBLOCK);
        if ((i=fcntl(sockfd, F_SETFL, arg)) < 0) {
                printf("error setting socket flag for fd %i: fcntl(..., F_SETFL): %i\n", sockfd, errno);
                return -1;
        }
        return i;
}
*/
int main() {
        FILE *input;
        char slice[BUFLEN];
        int status = 0;
        pid_t pid;
        int err;
        int newfd;
        // if you want you can pass arguments to the program to execute
        // char *const arguments[] = {EXECFILE, "-v", NULL};
        char *const arguments[] = {EXECFILE,  NULL};
        int father2child_pipefd[2];
        int child2father_pipefd[2];
        char *read_data = NULL;
        FILE *retclam;
        fd_set myset;
        int x=1;

        signal(SIGPIPE, SIG_IGN);
        newfd = dup(0);
        input = fdopen(newfd, "r");

        pipe(father2child_pipefd); // Father speaking to child
        pipe(child2father_pipefd); // Child speaking to father

        pid = fork();
        if (pid > 0) { // Father
                close(father2child_pipefd[0]);
                close(child2father_pipefd[1]);

                // Write to the pipe reading from stdin
                retclam = fdopen(child2father_pipefd[0], "r");


                // set the two fd non blocking
                //set_nonblock(0);
                //set_nonblock(child2father_pipefd[0]);
                //set_nonblock(fileno(retclam));

                while(x==1) {
                        // clear the file descriptor set
                        FD_ZERO(&myset);
                        // add the stdin to the set
                        FD_SET(fileno(input), &myset);
                        // add the child pipe to the set
                        FD_SET(fileno(retclam), &myset);

                        // here we wait for data to arrive from stdin or from the child pipe. The last argument is a timeout, if you like
                        err = select(fileno(retclam)+1, &myset, NULL, NULL, NULL);
                        switch(err) {
                        case -1:
                                // Problem with select(). The errno variable knows why
                                //exit(1);
                                x=0;
                                break;
                        case 0:
                                // timeout on select(). Data did not arrived in time, only valid if the last attribute of select() was specified
                                break;
                        default:
                                // data is ready to be read
                                bzero(slice, BUFLEN);
                                if (FD_ISSET(fileno(retclam), &myset)) { // data ready on the child
                                        //set_block(fileno(retclam));
                                        read_data = fgets(slice, BUFLEN, retclam); // read a line from the child (max BUFLEN bytes)
                                        //set_nonblock(fileno(retclam));
                                        if (read_data == NULL) {
                                                //exit(0);
                                                x=0;
                                                break;
                                        }
                                        // write data back to stdout
                                        write (1, slice, strlen(slice));
                                        if(feof(retclam)) {
                                                //exit(0);
                                                x=0;
                                                break;
                                        }
                                        break;
                                }
                                bzero(slice, BUFLEN);
                                if (FD_ISSET(fileno(input), &myset)) { // data ready on stdin
                                        //printf("father\n");
                                        //set_block(fileno(input));
                                        read_data = fgets(slice, BUFLEN, input); // read a line from stdin (max BUFLEN bytes)
                                        //set_nonblock(fileno(input));
                                        if (read_data == NULL) {
                                                //exit (0);
                                                close(father2child_pipefd[1]);
                                                waitpid(pid, &status, 0);
                                                //fclose(input);
                                                break;
                                        }
                                        // write data to the child
                                        write (father2child_pipefd[1], slice, strlen(slice));
                                        /*
                                        if(feof(input)) {
                                                exit(0);
                                        }*/
                                        break;
                                }
                        }
                }

                close(father2child_pipefd[1]);
                fclose(input);
                fsync(1);
                waitpid(pid, &status, 0);

                // child process terminated
                fclose (retclam);

                // Parse output data from child
                // write (1, "you can append somethind else on stdout if you like");
                if (WEXITSTATUS(status) == 0) {
                        exit (0); // child process exited successfully
                }
        }

        if (pid == 0) { // Child
                close (0); // stdin is not needed
                close (1); // stdout is not needed
                // Close the write side of this pipe
                close(father2child_pipefd[1]);
                // Close the read side of this pipe
                close(child2father_pipefd[0]);

                // Let's read on stdin, but this stdin is associated to the read pipe
                dup2(father2child_pipefd[0], 0);
                // Let's speak on stdout, but this stdout is associated to the write pipe
                dup2(child2father_pipefd[1], 1);

                // if you like you can put something back to the father before execve
                //write (child2father_pipefd[1], "something", 9);
                //fsync(child2father_pipefd[1]);
                err = execve(EXECFILE, arguments, NULL);

                // we'll never be here again after execve succeeded!! So we get here only if the execve() failed
                //fprintf(stderr, "Problem executing file %s: %i: %s\n", EXECFILE, err, strerror(errno));
                exit (1);
        }

        if (pid < 0) { // Error
                exit (1);
        }

        fclose(input);

        return 0;
}
0 голосов
/ 09 марта 2012

Если вам нужно управлять сеансом интерпретатора Python, вам, вероятно, будет лучше, если

Кстати, в последнем случае сервер может бытьработать где угодно, и у PyScripter уже есть работающий серверный модуль (клиентский модуль находится на Pascal, его нужно будет перевести).

0 голосов
/ 06 марта 2012

Ваше предположение, что виноват буферизованный ввод / вывод, скорее всего верно. Как вы написали свой цикл, чтение будет блокироваться до тех пор, пока не заполнит требуемый буфер, и вы не сможете обработать любой ввод, пока он не вернется. Это может легко вызвать тупик.

Popen.communicate решает эту проблему, заставляя поток работать с каждым каналом и обеспечивая, чтобы в нем были все данные для записи в stdin, чтобы фактическая запись не могла быть отложена, пока объект файла ожидает буфера заполнить или для объекта файла, который будет очищен / закрыт. Я думаю, что вы могли бы сделать так, чтобы решение, включающее потоки, работало, если вам нужно, но это не совсем асинхронно и, вероятно, не самое простое решение.

Вы можете обойти буферизацию python, не используя файловые объекты, предоставленные Popen, для доступа к каналам, а вместо этого захватывая их fd с помощью метода fileno (). Затем вы можете использовать fd с os.read, os.write и select.select. Функции os.read и os.write не будут выполнять буферизацию, но будут блокироваться, пока не будет прочитан / записан хотя бы один байт. Вы должны убедиться, что канал доступен для чтения / записи, прежде чем вызывать их. Самый простой способ сделать это - использовать select.select () для ожидания всех каналов, которые вы хотите прочитать / записать, и сделать один вызов чтения или записи для каждого канала, который готов, когда select () вернется. Вы должны быть в состоянии найти примеры циклов выбора, если будете искать (они, вероятно, будут использовать сокеты вместо каналов, но принцип тот же). (Кроме того, никогда не выполняйте чтение или запись, не проверив сначала, что они не будут блокироваться, или вы можете столкнуться со случаями, когда вы вызываете взаимоблокировку с дочерним процессом. Вы должны быть готовы к чтению данных, даже если вы этого не сделали. пока написано все что хочешь.)

0 голосов
/ 06 марта 2012

Я использую 2-way io в bash следующим образом:

mkfifo hotleg
mkfifo coldleg

program <coldleg |tee hotleg &

while read LINE; do
 case $LINE in
  *)call_a_function $LINE;;
 esac
done <hotleg |tee coldleg &

(обратите внимание, что вы можете просто использовать «>» вместо «tee», но сначала вы можете захотеть увидеть вывод)

...