Потоковая передача Stdout и Stderr через SSH, манипулирование потоком, а затем печать на локальный Stdout и Stderr - PullRequest
0 голосов
/ 24 января 2019

Я выполняю кучу операций над SSH на удаленной машине, и я передаю их stdout и stderr, а затем передаю их пишущему, который записывает в локальные stdout и stderr вместе с байтовыми буферами.

Непосредственно перед тем, как писатель использует это, я хочу выполнить ряд строковых манипуляций с ним, а затем записать на мой экран и в буфер.До этого момента все отлично работает и денди.

Моя проблема в том, что это больше не поток, он зависает, а затем выводит весь глобус в один фрагмент.Я хочу, чтобы это было в режиме реального времени, поэтому я включил каналы в свои процедуры go, но без улучшения.Ниже приведены мои функции, дайте мне знать, если вы можете найти причину, или, возможно, лучший способ достижения этого.

// отправка

func handleStdStream(filters []string, replaceFilters map[string]string, pipe io.Reader, readers chan io.Reader) {
        if filters != nil {
                // filters exist
                // read first 8 bytes
                res := readPipe(8, pipe)

                // get each line from the resulting streamed output
                for _, str := range strings.Split(res, "\n") {
                        if str != "" {
                                out := lineFilterAndReplace(str, filters, replaceFilters)

                                // instantiate an io.Reader obj from the given string
                                outReader := strings.NewReader(out)

                                readers <- outReader
                        }
                }
        } else {
                // filters dont exist
                if len(replaceFilters) > 0 {
                        res := readPipe(8, pipe)

                        for _, str := range strings.Split(res, "\n") {
                                if str != "" {
                                        out := lineReplace(str, replaceFilters)

                                        // instantiate an io.Reader obj from the given string
                                        outReader := strings.NewReader(out)

                                        readers <- outReader
                                }
                        }
                } else {
                        readers <- pipe
                }
        }
}

// получение

    outReaders := make(chan io.Reader)

    go handleStdStream(outFilters, replaceFilters, stdoutIn, outReaders)

    go func() {
            for {
                    pipe := <-outReaders

                    _, errStdout = io.Copy(outWriter, pipe)
            }

            // _, errStdout = io.Copy(outWriter, stdoutIn)
    }()

1 Ответ

0 голосов
/ 24 января 2019

Я не думаю, что вам нужны каналы или goroutines для достижения этой цели. Интерфейсы Writer и Reader уже транслируются; Вы непрерывно потягиваете байты из Reader до тех пор, пока не нажмете EOF или ошибку и не передадите байты до Writer до тех пор, пока не закончите или не получите ошибку. Сама по себе обработка потока не требует параллелизма, поэтому последовательное выполнение одной и той же процедуры вполне уместно.

Вы не должны игнорировать сообщения об ошибках. Если функция или метод возвращает значение ошибки, необходимо проверить его. В случае ввода-вывода вам обычно нужно прекратить чтение из Reader, когда он возвращает ошибку, и вам обычно нужно прекратить запись в Writer, когда он возвращает ошибку. В случае Reader вы также должны проверить специальное значение «error» io.EOF.

Я думаю, что использовать Scanner из пакета bufio лучше, чем пытаться делать свою собственную буферизацию / разбиение. По умолчанию Scanner разбивает ввод на новые строки (LF в стиле Unix или CRLF в стиле DOS). Это также избавляет от необходимости проверять io.EOF, при условии, что вы взаимодействуете только с Reader через Scanner.

Рассмотрим следующую версию handleStdStream:

func handleStdStream(filters []string, replaceFilters map[string]string, pipe io.Reader, w io.Writer) error {
    scanner := bufio.NewScanner(pipe)
    for scanner.Scan() {
        str := scanner.Text()
        if str == "" {
            continue
        }
        out := ""
        if len(filters) != 0 {
            out = lineFilterAndReplace(str, filters, replaceFilters)
        } else {
            out = lineReplace(str, replaceFilters)
        }
        if _, err := w.Write([]byte(out)); err != nil {
            return err
        }
    }
    if err := scanner.Err(); err != nil {
        return err
    }
    return nil
}

Вы бы использовали это так:

err := handleStdStream(filters, replaceFilters, pipe, outWriter)
if err != nil {
    // do something, like printing the error to a log or stderr
}
...